Skip to content

Commit a94c2d1

Browse files
authored
Merge pull request kmesh-net#570 from Okabe-Rintarou-0/feat/cluster_conn
maintain cluster active connection counter for circuit breaker
2 parents 6e56261 + e9af645 commit a94c2d1

36 files changed

+501
-109
lines changed

api/cluster/cluster.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ message Cluster {
1616

1717
core.ApiStatus api_status = 128;
1818
string name = 1;
19+
uint32 id = 2;
1920
uint32 connect_timeout = 4;
2021
LbPolicy lb_policy = 6;
2122

api/v2-c/cluster/cluster.pb-c.c

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ const ProtobufCEnumDescriptor cluster__cluster__lb_policy__descriptor =
8282
cluster__cluster__lb_policy__value_ranges,
8383
NULL,NULL,NULL,NULL /* reserved[1234] */
8484
};
85-
static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] =
85+
static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[7] =
8686
{
8787
{
8888
"name",
@@ -96,6 +96,18 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] =
9696
0, /* flags */
9797
0,NULL,NULL /* reserved1,reserved2, etc */
9898
},
99+
{
100+
"id",
101+
2,
102+
PROTOBUF_C_LABEL_NONE,
103+
PROTOBUF_C_TYPE_UINT32,
104+
0, /* quantifier_offset */
105+
offsetof(Cluster__Cluster, id),
106+
NULL,
107+
NULL,
108+
0, /* flags */
109+
0,NULL,NULL /* reserved1,reserved2, etc */
110+
},
99111
{
100112
"connect_timeout",
101113
4,
@@ -158,22 +170,23 @@ static const ProtobufCFieldDescriptor cluster__cluster__field_descriptors[6] =
158170
},
159171
};
160172
static const unsigned cluster__cluster__field_indices_by_name[] = {
161-
5, /* field[5] = api_status */
162-
3, /* field[3] = circuit_breakers */
163-
1, /* field[1] = connect_timeout */
164-
2, /* field[2] = lb_policy */
165-
4, /* field[4] = load_assignment */
173+
6, /* field[6] = api_status */
174+
4, /* field[4] = circuit_breakers */
175+
2, /* field[2] = connect_timeout */
176+
1, /* field[1] = id */
177+
3, /* field[3] = lb_policy */
178+
5, /* field[5] = load_assignment */
166179
0, /* field[0] = name */
167180
};
168181
static const ProtobufCIntRange cluster__cluster__number_ranges[6 + 1] =
169182
{
170183
{ 1, 0 },
171-
{ 4, 1 },
172-
{ 6, 2 },
173-
{ 10, 3 },
174-
{ 33, 4 },
175-
{ 128, 5 },
176-
{ 0, 6 }
184+
{ 4, 2 },
185+
{ 6, 3 },
186+
{ 10, 4 },
187+
{ 33, 5 },
188+
{ 128, 6 },
189+
{ 0, 7 }
177190
};
178191
const ProtobufCMessageDescriptor cluster__cluster__descriptor =
179192
{
@@ -183,7 +196,7 @@ const ProtobufCMessageDescriptor cluster__cluster__descriptor =
183196
"Cluster__Cluster",
184197
"cluster",
185198
sizeof(Cluster__Cluster),
186-
6,
199+
7,
187200
cluster__cluster__field_descriptors,
188201
cluster__cluster__field_indices_by_name,
189202
6, cluster__cluster__number_ranges,

api/v2-c/cluster/cluster.pb-c.h

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2/cluster/cluster.pb.go

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */
2+
/* Copyright Authors of Kmesh */
3+
4+
#include "bpf_log.h"
5+
#include "kmesh_common.h"
6+
#include "bpf_common.h"
7+
8+
#ifndef __KMESH_CIRCUIT_BREAKER_H__
9+
#define __KMESH_CIRCUIT_BREAKER_H__
10+
11+
#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN
12+
13+
#pragma pack(1)
14+
struct cluster_stats {
15+
__u32 active_connections;
16+
};
17+
18+
struct cluster_stats_key {
19+
__u64 netns_cookie;
20+
__u32 cluster_id;
21+
};
22+
#pragma pack()
23+
24+
struct {
25+
__uint(type, BPF_MAP_TYPE_HASH);
26+
__uint(key_size, sizeof(struct cluster_stats_key));
27+
__uint(value_size, sizeof(struct cluster_stats));
28+
__uint(map_flags, BPF_F_NO_PREALLOC);
29+
__uint(max_entries, MAP_SIZE_OF_CLUSTER);
30+
} map_of_cluster_stats SEC(".maps");
31+
32+
struct cluster_sock_data {
33+
__u32 cluster_id;
34+
};
35+
36+
struct {
37+
__uint(type, BPF_MAP_TYPE_SK_STORAGE);
38+
__uint(map_flags, BPF_F_NO_PREALLOC);
39+
__type(key, int);
40+
__type(value, struct cluster_sock_data);
41+
} map_of_cluster_sock SEC(".maps");
42+
43+
static inline void update_cluster_active_connections(const struct cluster_stats_key *key, int delta)
44+
{
45+
struct cluster_stats *stats = NULL;
46+
if (!key) {
47+
return;
48+
}
49+
stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key);
50+
if (!stats) {
51+
struct cluster_stats init_value = {0};
52+
bpf_map_update_elem(&map_of_cluster_stats, key, &init_value, BPF_NOEXIST);
53+
stats = kmesh_map_lookup_elem(&map_of_cluster_stats, key);
54+
}
55+
56+
if (!stats) {
57+
BPF_LOG(ERR, CIRCUIT_BREAKER, "failed to get cluster stats");
58+
return;
59+
}
60+
if (delta < 0 && -delta > stats->active_connections) {
61+
BPF_LOG(ERR, CIRCUIT_BREAKER, "invalid delta update");
62+
return;
63+
}
64+
65+
__sync_fetch_and_add(&stats->active_connections, delta);
66+
67+
BPF_LOG(
68+
DEBUG,
69+
CIRCUIT_BREAKER,
70+
"update existing stats(netns_cookie = %lld, cluster_id = %ld), "
71+
"current active connections: %d",
72+
key->netns_cookie,
73+
key->cluster_id,
74+
stats->active_connections);
75+
}
76+
77+
static inline int on_cluster_sock_bind(ctx_buff_t *ctx, const Cluster__Cluster *cluster)
78+
{
79+
__u32 cluster_id = cluster->id;
80+
struct cluster_stats_key key = {0};
81+
__u64 cookie = bpf_get_netns_cookie(ctx);
82+
key.cluster_id = cluster_id;
83+
key.netns_cookie = cookie;
84+
struct cluster_stats *stats = NULL;
85+
stats = kmesh_map_lookup_elem(&map_of_cluster_stats, &key);
86+
87+
if (stats != NULL) {
88+
Cluster__CircuitBreakers *cbs = NULL;
89+
cbs = kmesh_get_ptr_val(cluster->circuit_breakers);
90+
if (cbs != NULL && stats->active_connections >= cbs->max_connections) {
91+
BPF_LOG(
92+
DEBUG,
93+
CIRCUIT_BREAKER,
94+
"Current active connections %d exceeded max connections "
95+
"%d, reject connection",
96+
stats->active_connections,
97+
cbs->max_connections);
98+
return -1;
99+
}
100+
}
101+
102+
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock bind for cluster id = %ld", cluster_id);
103+
104+
struct cluster_sock_data *data = NULL;
105+
if (!ctx->sk) {
106+
BPF_LOG(WARN, CIRCUIT_BREAKER, "provided sock is NULL");
107+
return 0;
108+
}
109+
data = bpf_sk_storage_get(&map_of_cluster_sock, ctx->sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE);
110+
if (!data) {
111+
BPF_LOG(ERR, CIRCUIT_BREAKER, "on_cluster_sock_bind call bpf_sk_storage_get failed");
112+
return 0;
113+
}
114+
data->cluster_id = cluster_id;
115+
return 0;
116+
}
117+
118+
static inline struct cluster_sock_data *get_cluster_sk_data(struct bpf_sock *sk)
119+
{
120+
struct cluster_sock_data *data = NULL;
121+
if (!sk) {
122+
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "provided sock is NULL");
123+
return NULL;
124+
}
125+
126+
data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, 0);
127+
return data;
128+
}
129+
130+
static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx)
131+
{
132+
if (!ctx) {
133+
return;
134+
}
135+
struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk);
136+
if (!data) {
137+
return;
138+
}
139+
__u64 cookie = bpf_get_netns_cookie(ctx);
140+
struct cluster_stats_key key = {0};
141+
key.netns_cookie = cookie;
142+
key.cluster_id = data->cluster_id;
143+
BPF_LOG(
144+
DEBUG,
145+
CIRCUIT_BREAKER,
146+
"increase cluster active connections(netns_cookie = %lld, cluster "
147+
"id = %ld)",
148+
key.netns_cookie,
149+
key.cluster_id);
150+
update_cluster_active_connections(&key, 1);
151+
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock connection for cluster id = %ld", data->cluster_id);
152+
}
153+
154+
static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx)
155+
{
156+
if (!ctx) {
157+
return;
158+
}
159+
struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk);
160+
if (!data) {
161+
return;
162+
}
163+
__u64 cookie = bpf_get_netns_cookie(ctx);
164+
struct cluster_stats_key key = {0};
165+
key.netns_cookie = cookie;
166+
key.cluster_id = data->cluster_id;
167+
update_cluster_active_connections(&key, -1);
168+
BPF_LOG(
169+
DEBUG,
170+
CIRCUIT_BREAKER,
171+
"decrease cluster active connections(netns_cookie = %lld, cluster "
172+
"id = %ld)",
173+
key.netns_cookie,
174+
key.cluster_id);
175+
BPF_LOG(DEBUG, CIRCUIT_BREAKER, "record sock close for cluster id = %ld", data->cluster_id);
176+
}
177+
178+
#endif

bpf/kmesh/ads/include/cluster.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "tail_call.h"
1010
#include "cluster/cluster.pb-c.h"
1111
#include "endpoint/endpoint.pb-c.h"
12+
#include "circuit_breaker.h"
1213

1314
#define CLUSTER_NAME_MAX_LEN BPF_DATA_MAX_LEN
1415

@@ -263,7 +264,7 @@ static inline int cluster_handle_loadbalance(Cluster__Cluster *cluster, address_
263264

264265
name = kmesh_get_ptr_val(cluster->name);
265266
if (!name) {
266-
BPF_LOG(ERR, CLUSTER, "filed to get cluster\n");
267+
BPF_LOG(ERR, CLUSTER, "failed to get cluster\n");
267268
return -EAGAIN;
268269
}
269270

@@ -316,6 +317,12 @@ int cluster_manager(ctx_buff_t *ctx)
316317
if (cluster == NULL)
317318
return KMESH_TAIL_CALL_RET(ENOENT);
318319

320+
ret = on_cluster_sock_bind(ctx, cluster);
321+
if (ret) {
322+
// open circuit breaker, should reject here.
323+
MARK_REJECTED(ctx);
324+
return KMESH_TAIL_CALL_RET(ret);
325+
}
319326
ret = cluster_handle_loadbalance(cluster, &addr, ctx);
320327
return KMESH_TAIL_CALL_RET(ret);
321328
}

bpf/kmesh/ads/include/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#define map_of_virtual_host kmesh_virtual_host
4747
#define map_of_route kmesh_route
4848
#define map_of_cluster kmesh_cluster
49+
#define map_of_cluster_stats kmesh_cluster_stats
4950
#define map_of_loadbalance kmesh_loadbalance
5051
#define map_of_endpoint kmesh_endpoint
5152
#define map_of_tail_call_prog kmesh_tail_call_prog

bpf/kmesh/ads/include/ctx/sock_addr.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ typedef struct bpf_sock_addr ctx_buff_t;
2121
(ctx)->user_ip4 = (address)->ipv4; \
2222
(ctx)->user_port = (address)->port
2323

24+
#define MARK_REJECTED(ctx)
25+
2426
#endif //__BPF_CTX_SOCK_ADDR_H

bpf/kmesh/ads/include/ctx/sock_ops.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ typedef struct bpf_sock_ops ctx_buff_t;
2323
#define SET_CTX_ADDRESS(ctx, address) \
2424
(ctx)->remote_ip4 = (address)->ipv4; \
2525
(ctx)->remote_port = (address)->port
26+
27+
#define MARK_REJECTED(ctx) \
28+
BPF_LOG(DEBUG, KMESH, "mark reject"); \
29+
(ctx)->remote_ip4 = 0; \
30+
(ctx)->remote_port = 0
31+
#else
32+
#define MARK_REJECTED(ctx)
2633
#endif
2734

2835
#endif //__BPF_CTX_SOCK_OPS_H

bpf/kmesh/ads/include/kmesh_common.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
#include "core/address.pb-c.h"
1212
#include "tail_call_index.h"
1313

14-
#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON
15-
#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON
16-
#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON
17-
#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON
18-
#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON
19-
#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON
20-
#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON
14+
#define BPF_LOGTYPE_LISTENER BPF_DEBUG_ON
15+
#define BPF_LOGTYPE_FILTERCHAIN BPF_DEBUG_ON
16+
#define BPF_LOGTYPE_FILTER BPF_DEBUG_ON
17+
#define BPF_LOGTYPE_CLUSTER BPF_DEBUG_ON
18+
#define BPF_LOGTYPE_ROUTER BPF_DEBUG_ON
19+
#define BPF_LOGTYPE_ROUTER_CONFIG BPF_DEBUG_ON
20+
#define BPF_LOGTYPE_COMMON BPF_DEBUG_ON
21+
#define BPF_LOGTYPE_CIRCUIT_BREAKER BPF_DEBUG_ON
2122

2223
#define BPF_OK 1
2324

0 commit comments

Comments
 (0)