Skip to content

Commit 6e56261

Browse files
authored
Merge pull request kmesh-net#900 from derekwin/lb-dev-09
Add locality loadbalance to kmesh workload mode.
2 parents de27c26 + 6160b57 commit 6e56261

File tree

12 files changed

+887
-71
lines changed

12 files changed

+887
-71
lines changed

bpf/kmesh/workload/include/service.h

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
1717
int ret = 0;
1818
endpoint_key endpoint_k = {0};
1919
endpoint_value *endpoint_v = NULL;
20+
int rand_k = 0;
21+
22+
if (service_v->prio_endpoint_count[0] == 0)
23+
return 0;
2024

2125
endpoint_k.service_id = service_id;
22-
endpoint_k.backend_index = bpf_get_prandom_u32() % service_v->endpoint_count + 1;
26+
endpoint_k.prio = 0; // for random handle,all endpoints are saved with highest priority
27+
28+
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[0] + 1;
29+
endpoint_k.backend_index = rand_k;
2330

2431
endpoint_v = map_lookup_endpoint(&endpoint_k);
2532
if (!endpoint_v) {
@@ -37,6 +44,50 @@ static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 servic
3744
return 0;
3845
}
3946

47+
static inline int
48+
lb_locality_failover_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v, bool is_strict)
49+
{
50+
int ret = 0;
51+
uint32_t rand_k = 0;
52+
endpoint_key endpoint_k = {0};
53+
endpoint_value *endpoint_v = NULL;
54+
endpoint_k.service_id = service_id;
55+
struct ip_addr zero_addr = {0};
56+
__u32 zero_port = 0;
57+
58+
// #pragma unroll
59+
for (int match_prio = 0; match_prio < PRIO_COUNT; match_prio++) {
60+
endpoint_k.prio = match_prio; // 0->6
61+
// if we have endpoints in this prio
62+
if (service_v->prio_endpoint_count[match_prio] > 0) {
63+
rand_k = bpf_get_prandom_u32() % service_v->prio_endpoint_count[match_prio] + 1;
64+
endpoint_k.backend_index = rand_k;
65+
endpoint_v = map_lookup_endpoint(&endpoint_k);
66+
if (!endpoint_v) {
67+
BPF_LOG(
68+
ERR, SERVICE, "find endpoint [%u/%u/%u] failed", service_id, match_prio, endpoint_k.backend_index);
69+
return -ENOENT;
70+
}
71+
ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v);
72+
if (ret != 0) {
73+
if (ret != -ENOENT)
74+
BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret);
75+
return ret;
76+
}
77+
BPF_LOG(DEBUG, SERVICE, "locality loadbalance matched backend_uid %d\n", endpoint_v->backend_uid);
78+
return 0; // find the backend successfully
79+
}
80+
if (is_strict) { // only match max priority in strict mode
81+
kmesh_ctx->dnat_ip = zero_addr;
82+
kmesh_ctx->dnat_port = zero_port;
83+
BPF_LOG(DEBUG, SERVICE, "locality loadbalance match nothing in STRICT mode\n");
84+
return -ENOENT;
85+
}
86+
}
87+
// no backend matched
88+
return -ENOENT;
89+
}
90+
4091
static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
4192
{
4293
int ret = 0;
@@ -55,15 +106,18 @@ static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service
55106
return ret;
56107
}
57108

58-
if (service_v->endpoint_count == 0) {
59-
BPF_LOG(DEBUG, SERVICE, "service %u has no endpoint", service_id);
60-
return 0;
61-
}
109+
BPF_LOG(DEBUG, SERVICE, "service [%u] policy [%u] failed", service_id, service_v->lb_policy);
62110

63111
switch (service_v->lb_policy) {
64112
case LB_POLICY_RANDOM:
65113
ret = lb_random_handle(kmesh_ctx, service_id, service_v);
66114
break;
115+
case LB_POLICY_STRICT:
116+
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, true);
117+
break;
118+
case LB_POLICY_FAILOVER:
119+
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v, false);
120+
break;
67121
default:
68122
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
69123
ret = -EINVAL;

bpf/kmesh/workload/include/workload.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#define MAX_PORT_COUNT 10
1111
#define MAX_SERVICE_COUNT 10
1212
#define RINGBUF_SIZE (1 << 12)
13+
#define PRIO_COUNT 7
1314
#define MAX_MEMBER_NUM_PER_POLICY 4
1415

1516
#pragma pack(1)
@@ -28,8 +29,9 @@ typedef struct {
2829
} service_key;
2930

3031
typedef struct {
31-
__u32 endpoint_count; // endpoint count of current service
32-
__u32 lb_policy; // load balancing algorithm, currently only supports random algorithm
32+
__u32 prio_endpoint_count[PRIO_COUNT]; // endpoint count of current service with prio
33+
__u32 lb_policy; // load balancing algorithm, currently supports random algorithm, locality loadbalance
34+
// Failover/strict mode
3335
__u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value
3436
// is MAX_PORT_COUNT-1
3537
__u32 target_port[MAX_PORT_COUNT];
@@ -40,6 +42,7 @@ typedef struct {
4042
// endpoint map
4143
typedef struct {
4244
__u32 service_id; // service id
45+
__u32 prio; // 0 means heightest prio, match all scope, 6 means lowest prio.
4346
__u32 backend_index; // if endpoint_count = 3, then backend_index = 0/1/2
4447
} endpoint_key;
4548

bpf/kmesh/workload/include/workload_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
// loadbalance type
2323
typedef enum {
2424
LB_POLICY_RANDOM = 0,
25+
LB_POLICY_STRICT = 1,
26+
LB_POLICY_FAILOVER = 2,
2527
} lb_policy_t;
2628

2729
#pragma pack(1)

pkg/controller/workload/bpfcache/endpoint.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,13 @@ import (
2323
"istio.io/istio/pkg/util/sets"
2424
)
2525

26+
const (
27+
PrioCount = 7
28+
)
29+
2630
type EndpointKey struct {
2731
ServiceId uint32 // service id
32+
Prio uint32
2833
BackendIndex uint32 // if endpoint_count = 3, then backend_index = 1/2/3
2934
}
3035

@@ -65,15 +70,17 @@ func (c *Cache) EndpointDelete(key *EndpointKey) error {
6570
}
6671

6772
// EndpointSwap update the last endpoint index and remove the current endpoint
68-
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) error {
73+
func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32, prio uint32) error {
6974
if currentIndex == lastIndex {
7075
return c.EndpointDelete(&EndpointKey{
7176
ServiceId: serviceId,
77+
Prio: prio,
7278
BackendIndex: lastIndex,
7379
})
7480
}
7581
lastKey := &EndpointKey{
7682
ServiceId: serviceId,
83+
Prio: prio,
7784
BackendIndex: lastIndex,
7885
}
7986
lastValue := &EndpointValue{}
@@ -83,6 +90,7 @@ func (c *Cache) EndpointSwap(currentIndex, lastIndex uint32, serviceId uint32) e
8390

8491
currentKey := &EndpointKey{
8592
ServiceId: serviceId,
93+
Prio: prio,
8694
BackendIndex: currentIndex,
8795
}
8896
currentValue := &EndpointValue{}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright The Kmesh Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package bpfcache
18+
19+
import (
20+
"sync"
21+
22+
"kmesh.net/kmesh/api/v2/workloadapi"
23+
)
24+
25+
// localityInfo records local node workload locality info
26+
type localityInfo struct {
27+
region string // init from workload.GetLocality().GetRegion()
28+
zone string // init from workload.GetLocality().GetZone()
29+
subZone string // init from workload.GetLocality().GetSubZone()
30+
nodeName string // init from os.Getenv("NODE_NAME"), workload.GetNode()
31+
clusterId string // init from workload.GetClusterId()
32+
network string // workload.GetNetwork()
33+
}
34+
35+
type LocalityCache struct {
36+
mutex sync.RWMutex
37+
LocalityInfo *localityInfo
38+
}
39+
40+
func NewLocalityCache() LocalityCache {
41+
return LocalityCache{
42+
LocalityInfo: nil,
43+
}
44+
}
45+
46+
func (l *LocalityCache) SetLocality(nodeName, clusterId, network string, locality *workloadapi.Locality) {
47+
l.mutex.Lock()
48+
defer l.mutex.Unlock()
49+
if l.LocalityInfo == nil {
50+
l.LocalityInfo = &localityInfo{}
51+
}
52+
53+
// notice: nodeName should set by processor or os.Getenv("NODE_NAME"),
54+
l.LocalityInfo.region = locality.GetRegion()
55+
l.LocalityInfo.zone = locality.GetZone()
56+
l.LocalityInfo.subZone = locality.GetSubzone()
57+
l.LocalityInfo.nodeName = nodeName
58+
l.LocalityInfo.clusterId = clusterId
59+
l.LocalityInfo.network = network
60+
}
61+
62+
func (l *LocalityCache) CalcLocalityLBPrio(wl *workloadapi.Workload, rp []workloadapi.LoadBalancing_Scope) uint32 {
63+
var rank uint32 = 0
64+
for _, scope := range rp {
65+
match := false
66+
switch scope {
67+
case workloadapi.LoadBalancing_REGION:
68+
if l.LocalityInfo.region == wl.GetLocality().GetRegion() {
69+
match = true
70+
}
71+
case workloadapi.LoadBalancing_ZONE:
72+
if l.LocalityInfo.zone == wl.GetLocality().GetZone() {
73+
match = true
74+
}
75+
case workloadapi.LoadBalancing_SUBZONE:
76+
if l.LocalityInfo.subZone == wl.GetLocality().GetSubzone() {
77+
match = true
78+
}
79+
case workloadapi.LoadBalancing_NODE:
80+
if l.LocalityInfo.nodeName == wl.GetNode() {
81+
match = true
82+
}
83+
case workloadapi.LoadBalancing_CLUSTER:
84+
if l.LocalityInfo.clusterId == wl.GetClusterId() {
85+
match = true
86+
}
87+
case workloadapi.LoadBalancing_NETWORK:
88+
log.Debugf("l.LocalityInfo.network %#v, wl.GetNetwork() %#v", l.LocalityInfo.network, wl.GetNetwork())
89+
if l.LocalityInfo.network == wl.GetNetwork() {
90+
match = true
91+
}
92+
}
93+
if match {
94+
rank++
95+
} else {
96+
break
97+
}
98+
}
99+
return uint32(len(rp)) - rank
100+
}

0 commit comments

Comments
 (0)