Skip to content

Commit 3a81bcc

Browse files
authored
Merge pull request #1275 from weli-l/dev/auth_ip_optimize
add channel for Rbac when Kmesh restart
2 parents e42306c + 598ab29 commit 3a81bcc

File tree

3 files changed

+41
-3
lines changed

3 files changed

+41
-3
lines changed

pkg/controller/workload/workload_controller.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package workload
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223

2324
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2425

@@ -66,7 +67,21 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM
6667
}
6768

6869
func (c *Controller) Run(ctx context.Context) {
69-
go c.Rbac.Run(ctx, c.bpfWorkloadObj.SockOps.KmAuthReq, c.bpfWorkloadObj.XdpAuth.KmAuthRes)
70+
var wg sync.WaitGroup
71+
wg.Add(2)
72+
go func() {
73+
<-c.Processor.addressDone
74+
wg.Done()
75+
}()
76+
go func() {
77+
<-c.Processor.authzDone
78+
wg.Done()
79+
}()
80+
go func() {
81+
wg.Wait()
82+
c.Rbac.Run(ctx, c.bpfWorkloadObj.SockOps.KmAuthReq, c.bpfWorkloadObj.XdpAuth.KmAuthRes)
83+
}()
84+
7085
go c.MetricController.Run(ctx, c.bpfWorkloadObj.SockConn.KmTcpProbe)
7186
if c.MapMetricController != nil {
7287
go c.MapMetricController.Run(ctx)

pkg/controller/workload/workload_controller_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,12 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) {
199199
}
200200
}
201201

202-
func TestAdsStream_AdsStreamProcess(t *testing.T) {
202+
func TestWorkloadStream_WorkloadstreamProcess(t *testing.T) {
203203
workloadStream := Controller{
204204
Processor: &Processor{
205-
ack: &discoveryv3.DeltaDiscoveryRequest{},
205+
ack: &discoveryv3.DeltaDiscoveryRequest{},
206+
addressDone: make(chan struct{}),
207+
authzDone: make(chan struct{}),
206208
},
207209
}
208210

@@ -217,6 +219,12 @@ func TestAdsStream_AdsStreamProcess(t *testing.T) {
217219

218220
patches1 := gomonkey.NewPatches()
219221
patches2 := gomonkey.NewPatches()
222+
consumeSignals := func() {
223+
go func() {
224+
<-workloadStream.Processor.addressDone
225+
<-workloadStream.Processor.authzDone
226+
}()
227+
}
220228
tests := []struct {
221229
name string
222230
beforeFunc func()
@@ -302,6 +310,7 @@ func TestAdsStream_AdsStreamProcess(t *testing.T) {
302310
for _, tt := range tests {
303311
t.Run(tt.name, func(t *testing.T) {
304312
tt.beforeFunc()
313+
consumeSignals()
305314
err := workloadStream.HandleWorkloadStream()
306315

307316
if (err != nil) != tt.wantErr {

pkg/controller/workload/workload_processor.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ type Processor struct {
6666

6767
once sync.Once
6868
authzOnce sync.Once
69+
70+
// used to notify Rbac the address/authz type response is done when Kmesh restart
71+
addressDone chan struct{}
72+
authzDone chan struct{}
73+
addressRespOnce sync.Once
74+
authzRespOnce sync.Once
6975
}
7076

7177
func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
@@ -80,6 +86,8 @@ func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
8086
EndpointCache: cache.NewEndpointCache(),
8187
WaypointCache: cache.NewWaypointCache(serviceCache),
8288
locality: bpf.NewLocalityCache(),
89+
addressDone: make(chan struct{}, 1),
90+
authzDone: make(chan struct{}, 1),
8391
}
8492
}
8593

@@ -119,8 +127,14 @@ func (p *Processor) processWorkloadResponse(rsp *service_discovery_v3.DeltaDisco
119127
switch rsp.GetTypeUrl() {
120128
case AddressType:
121129
err = p.handleAddressTypeResponse(rsp)
130+
p.addressRespOnce.Do(func() {
131+
p.addressDone <- struct{}{}
132+
})
122133
case AuthorizationType:
123134
err = p.handleAuthorizationTypeResponse(rsp, rbac)
135+
p.authzRespOnce.Do(func() {
136+
p.authzDone <- struct{}{}
137+
})
124138
default:
125139
err = fmt.Errorf("unsupported type url %s", rsp.GetTypeUrl())
126140
}

0 commit comments

Comments
 (0)