From 8e247d6d5886cabe71fc2fdbfdadc1697230841b Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 21 Dec 2021 17:25:21 +0800 Subject: [PATCH 01/11] server: use separate goroutines for sotw bidi streams (#530) Signed-off-by: Rueian --- go.mod | 1 + go.sum | 1 + pkg/server/sotw/v3/server.go | 180 +++++++++++++++------------------ pkg/server/sotw/v3/watches.go | 56 +++++----- pkg/server/stream/v3/stream.go | 48 +++++++++ 5 files changed, 154 insertions(+), 132 deletions(-) diff --git a/go.mod b/go.mod index 830a795df3..8079cd04a1 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/stretchr/testify v1.7.0 go.opentelemetry.io/proto/otlp v0.7.0 golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect + golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.36.0 diff --git a/go.sum b/go.sum index 28c9b41f49..15b45aa4c0 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index d3b167379a..8d0776e89b 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,10 +18,10 @@ package sotw import ( "context" "errors" - "reflect" "strconv" "sync/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -63,15 +63,6 @@ type server struct { streamCount int64 } -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} -} - // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -81,14 +72,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 - streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} + streamState := stream.NewSTOWStreamState() // a collection of stack allocated watches per request type watches := newWatches() defer func() { - watches.close() if s.callbacks != nil { s.callbacks.OnStreamClosed(streamID) } @@ -109,14 +98,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + lastResponse := stream.NewLastDiscoveryResponse(out.Nonce, resp.GetRequest().ResourceNames) + streamState.Set(resp.GetRequest().TypeUrl, lastResponse) if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) @@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // node may only be set on the first discovery request var node = &core.Node{} - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } + var resCh = make(chan cache.Response, 1) - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } + ctx, cancel := context.WithCancel(s.ctx) + eg, ctx := errgroup.WithContext(ctx) - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() + eg.Go(func() error { + defer func() { + watches.close() // this should remove all watches from the cache + close(resCh) // close resCh and let the second eg.Go drain it + }() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + for { + select { + case <-ctx.Done(): + return nil + case req, more := <-reqCh: + if !more { + return nil } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.TypeUrl == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } else if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL } - } - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } } - } - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() + if lastResponse, ok := streamState.Get(req.TypeUrl); ok { + if lastResponse.Nonce == "" || lastResponse.Nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.Resources) + } + } + typeURL := req.GetTypeUrl() + if w := watches.getWatch(typeURL); w != nil { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if n := w.getNonce(); n == "" || n == nonce { + w.close() + + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), }) } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) } + } + }) - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + eg.Go(func() (err error) { + var nonce string + for res := range resCh { + if res == nil || err != nil { + continue // this loop should not exit until resCh closed } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err + if nonce, err = send(res); err == nil { + if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil { + w.setNonce(nonce) + } + } else { + cancel() } - - watches.responders[res.GetRequest().TypeUrl].nonce = nonce } - } + return err + }) + + return eg.Wait() } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..b5c659d3f1 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -1,36 +1,38 @@ package sotw import ( - "context" - "reflect" + "sync" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" ) // watches for all xDS resource types type watches struct { + mu sync.RWMutex responders map[string]*watch - - // cases is a dynamic select case for the watched channels. - cases []reflect.SelectCase } // newWatches creates and initializes watches. func newWatches() watches { return watches{ responders: make(map[string]*watch, int(types.UnknownType)), - cases: make([]reflect.SelectCase, 0), } } // addWatch creates a new watch entry in the watches map. // Watches are sorted by typeURL. func (w *watches) addWatch(typeURL string, watch *watch) { + w.mu.Lock() + defer w.mu.Unlock() w.responders[typeURL] = watch } +func (w *watches) getWatch(typeURL string) (watch *watch) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.responders[typeURL] +} + // close all open watches func (w *watches) close() { for _, watch := range w.responders { @@ -38,33 +40,23 @@ func (w *watches) close() { } } -// recomputeWatches rebuilds the known list of dynamic channels if needed -func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) { - w.cases = w.cases[:0] // Clear the existing cases while retaining capacity. - - w.cases = append(w.cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(req), - }, - ) +// watch contains the necessary modifiables for receiving resource responses +type watch struct { + mu sync.RWMutex + cancel func() + nonce string +} - for _, watch := range w.responders { - w.cases = append(w.cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(watch.response), - }) - } +func (w *watch) getNonce() (n string) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.nonce } -// watch contains the necessary modifiables for receiving resource responses -type watch struct { - cancel func() - nonce string - response chan cache.Response +func (w *watch) setNonce(n string) { + w.mu.Lock() + defer w.mu.Unlock() + w.nonce = n } // close cancels an open watch diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 3a4247c457..054991078e 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -1,6 +1,8 @@ package stream import ( + "sync" + "google.golang.org/grpc" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -85,3 +87,49 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St return state } + +func NewLastDiscoveryResponse(nonce string, resources []string) LastDiscoveryResponse { + resp := LastDiscoveryResponse{ + Nonce: nonce, + Resources: make(map[string]struct{}), + } + for _, r := range resources { + resp.Resources[r] = struct{}{} + } + return resp +} + +// LastDiscoveryResponse that is sent over GRPC stream +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type LastDiscoveryResponse struct { + Nonce string + Resources map[string]struct{} +} + +func NewSTOWStreamState() STOWStreamState { + return STOWStreamState{ + StreamState: NewStreamState(false, map[string]string{}), + responses: make(map[string]LastDiscoveryResponse), + } +} + +type STOWStreamState struct { + StreamState + responses map[string]LastDiscoveryResponse + mu sync.RWMutex +} + +func (l *STOWStreamState) Set(key string, value LastDiscoveryResponse) { + l.mu.Lock() + defer l.mu.Unlock() + l.responses[key] = value +} + +func (l *STOWStreamState) Get(key string) (value LastDiscoveryResponse, ok bool) { + l.mu.RLock() + defer l.mu.RUnlock() + value, ok = l.responses[key] + return +} From 1c3d23f1c4567c41a1e89a8697da30c87463a436 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 13 Jan 2022 23:54:05 +0800 Subject: [PATCH 02/11] server: test the deadlock between LinearCache and SOTW server (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 63 ++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index af349e0c8a..102d8fdb13 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -18,8 +18,11 @@ import ( "context" "errors" "fmt" + "math/rand" "reflect" + "strconv" "sync" + "sync/atomic" "testing" "time" @@ -667,3 +670,63 @@ func TestCallbackError(t *testing.T) { }) } } + +func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { + for _, typ := range testTypes { + t.Run(typ, func(t *testing.T) { + t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + + nonce := int64(0) + ver, targetVer := uint64(0), uint64(100000) + untilVerExceed := func(exceed uint64, fn func(current uint64)) { + for current := atomic.LoadUint64(&ver); current < exceed; current = atomic.LoadUint64(&ver) { + fn(current) + } + } + + config := cache.NewLinearCache(typ) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + + resources := make([]string, 10) + for i := range resources { + resources[i] = strconv.Itoa(i) + } + + mStream := makeMockStream(t) + + go func() { + untilVerExceed(targetVer, func(current uint64) { + mStream.recv <- &discovery.DiscoveryRequest{ + Node: node, + TypeUrl: typ, + ResourceNames: resources, + VersionInfo: strconv.FormatUint(current, 10), + ResponseNonce: strconv.FormatInt(atomic.LoadInt64(&nonce), 10), + } + }) + close(mStream.recv) + }() + + go func() { + untilVerExceed(targetVer, func(current uint64) { + config.SetResources(map[string]types.Resource{ + resources[rand.Intn(len(resources))]: opaque, //nolint + }) + }) + }() + + go func() { + for resp := range mStream.sent { + v, _ := strconv.ParseUint(resp.VersionInfo, 10, 64) + atomic.StoreUint64(&ver, v) + n, _ := strconv.ParseInt(resp.Nonce, 10, 64) + atomic.StoreInt64(&nonce, n) + } + }() + + err := s.StreamAggregatedResources(mStream) + assert.Nil(t, err) + close(mStream.sent) + }) + } +} From b6d83f7aa3eb06522a8cb7e35ee36ddacaa2599e Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 21 Jan 2022 00:12:32 +0800 Subject: [PATCH 03/11] server: reword comment of the deadlock test (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 102d8fdb13..3a6b8c0da7 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -674,7 +674,7 @@ func TestCallbackError(t *testing.T) { func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { - t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + t.Log("Integrating LinearCache with SOTW server. If this is never completed, it might be because they are dead locked.") nonce := int64(0) ver, targetVer := uint64(0), uint64(100000) From ad49378f24e0ddde42471353478ee965c0e45d78 Mon Sep 17 00:00:00 2001 From: Rueian Date: Tue, 21 Dec 2021 17:25:21 +0800 Subject: [PATCH 04/11] server: use separate goroutines for sotw bidi streams (#530) Signed-off-by: Rueian --- go.mod | 1 + go.sum | 2 + pkg/server/sotw/v3/server.go | 180 +++++++++++++++------------------ pkg/server/sotw/v3/watches.go | 56 +++++----- pkg/server/stream/v3/stream.go | 48 +++++++++ 5 files changed, 155 insertions(+), 132 deletions(-) diff --git a/go.mod b/go.mod index ce57eb360d..1894f8515c 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/stretchr/testify v1.7.1 go.opentelemetry.io/proto/otlp v0.15.0 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 google.golang.org/grpc v1.45.0 google.golang.org/protobuf v1.28.0 diff --git a/go.sum b/go.sum index 950d7d44c6..30e87b0112 100644 --- a/go.sum +++ b/go.sum @@ -253,6 +253,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 91681237c9..f00169d4e1 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,10 +18,10 @@ package sotw import ( "context" "errors" - "reflect" "strconv" "sync/atomic" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -63,15 +63,6 @@ type server struct { streamCount int64 } -// Discovery response that is sent over GRPC stream -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} -} - // process handles a bi-di stream request func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // increment stream count @@ -81,8 +72,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 - streamState := stream.NewStreamState(false, map[string]string{}) - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} + streamState := stream.NewSTOWStreamState() // a collection of stack allocated watches per request type watches := newWatches() @@ -91,7 +81,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq var node = &core.Node{} defer func() { - watches.close() if s.callbacks != nil { s.callbacks.OnStreamClosed(streamID, node) } @@ -112,14 +101,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) - lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, - resources: make(map[string]struct{}), - } - for _, r := range resp.GetRequest().ResourceNames { - lastResponse.resources[r] = struct{}{} - } - lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + lastResponse := stream.NewLastDiscoveryResponse(out.Nonce, resp.GetRequest().ResourceNames) + streamState.Set(resp.GetRequest().TypeUrl, lastResponse) if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) @@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } - // recompute dynamic channels for this stream - watches.recompute(s.ctx, reqCh) - - for { - // The list of select cases looks like this: - // 0: <- ctx.Done - // 1: <- reqCh - // 2...: per type watches - index, value, ok := reflect.Select(watches.cases) - switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed - case 0: - return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed - case 1: - // input stream ended or errored out - if !ok { - return nil - } + var resCh = make(chan cache.Response, 1) - req := value.Interface().(*discovery.DiscoveryRequest) - if req == nil { - return status.Errorf(codes.Unavailable, "empty request") - } + ctx, cancel := context.WithCancel(s.ctx) + eg, ctx := errgroup.WithContext(ctx) - // node field in discovery request is delta-compressed - if req.Node != nil { - node = req.Node - } else { - req.Node = node - } - - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() + eg.Go(func() error { + defer func() { + watches.close() // this should remove all watches from the cache + close(resCh) // close resCh and let the second eg.Go drain it + }() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + for { + select { + case <-ctx.Done(): + return nil + case req, more := <-reqCh: + if !more { + return nil } - } else if req.TypeUrl == "" { - req.TypeUrl = defaultTypeURL - } + if req == nil { + return status.Errorf(codes.Unavailable, "empty request") + } + // node field in discovery request is delta-compressed + if req.Node != nil { + node = req.Node + } else { + req.Node = node + } + + // nonces can be reused across streams; we verify nonce only if nonce is not initialized + nonce := req.GetResponseNonce() - if s.callbacks != nil { - if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { - return err + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType { + if req.TypeUrl == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + } + } else if req.TypeUrl == "" { + req.TypeUrl = defaultTypeURL } - } - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + if s.callbacks != nil { + if err := s.callbacks.OnStreamRequest(streamID, req); err != nil { + return err + } } - } - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() + if lastResponse, ok := streamState.Get(req.TypeUrl); ok { + if lastResponse.Nonce == "" || lastResponse.Nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.Resources) + } + } + typeURL := req.GetTypeUrl() + if w := watches.getWatch(typeURL); w != nil { + // We've found a pre-existing watch, lets check and update if needed. + // If these requirements aren't satisfied, leave an open watch. + if n := w.getNonce(); n == "" || n == nonce { + w.close() + + watches.addWatch(typeURL, &watch{ + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), + }) + } + } else { + // No pre-existing watch exists, let's create one. + // We need to precompute the watches first then open a watch in the cache. watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, + cancel: s.cache.CreateWatch(req, streamState.StreamState, resCh), }) } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, streamState, responder), - response: responder, - }) } + } + }) - // Recompute the dynamic select cases for this stream. - watches.recompute(s.ctx, reqCh) - default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL - if !ok { - // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? - return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) + eg.Go(func() (err error) { + var nonce string + for res := range resCh { + if res == nil || err != nil { + continue // this loop should not exit until resCh closed } - - res := value.Interface().(cache.Response) - nonce, err := send(res) - if err != nil { - return err + if nonce, err = send(res); err == nil { + if w := watches.getWatch(res.GetRequest().TypeUrl); w != nil { + w.setNonce(nonce) + } + } else { + cancel() } - - watches.responders[res.GetRequest().TypeUrl].nonce = nonce } - } + return err + }) + + return eg.Wait() } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index 45670d6a91..b5c659d3f1 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -1,36 +1,38 @@ package sotw import ( - "context" - "reflect" + "sync" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" ) // watches for all xDS resource types type watches struct { + mu sync.RWMutex responders map[string]*watch - - // cases is a dynamic select case for the watched channels. - cases []reflect.SelectCase } // newWatches creates and initializes watches. func newWatches() watches { return watches{ responders: make(map[string]*watch, int(types.UnknownType)), - cases: make([]reflect.SelectCase, 0), } } // addWatch creates a new watch entry in the watches map. // Watches are sorted by typeURL. func (w *watches) addWatch(typeURL string, watch *watch) { + w.mu.Lock() + defer w.mu.Unlock() w.responders[typeURL] = watch } +func (w *watches) getWatch(typeURL string) (watch *watch) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.responders[typeURL] +} + // close all open watches func (w *watches) close() { for _, watch := range w.responders { @@ -38,33 +40,23 @@ func (w *watches) close() { } } -// recomputeWatches rebuilds the known list of dynamic channels if needed -func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) { - w.cases = w.cases[:0] // Clear the existing cases while retaining capacity. - - w.cases = append(w.cases, - reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - }, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(req), - }, - ) +// watch contains the necessary modifiables for receiving resource responses +type watch struct { + mu sync.RWMutex + cancel func() + nonce string +} - for _, watch := range w.responders { - w.cases = append(w.cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(watch.response), - }) - } +func (w *watch) getNonce() (n string) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.nonce } -// watch contains the necessary modifiables for receiving resource responses -type watch struct { - cancel func() - nonce string - response chan cache.Response +func (w *watch) setNonce(n string) { + w.mu.Lock() + defer w.mu.Unlock() + w.nonce = n } // close cancels an open watch diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index b5832b7d58..093dd46d7d 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -1,6 +1,8 @@ package stream import ( + "sync" + "google.golang.org/grpc" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -124,3 +126,49 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St return state } + +func NewLastDiscoveryResponse(nonce string, resources []string) LastDiscoveryResponse { + resp := LastDiscoveryResponse{ + Nonce: nonce, + Resources: make(map[string]struct{}), + } + for _, r := range resources { + resp.Resources[r] = struct{}{} + } + return resp +} + +// LastDiscoveryResponse that is sent over GRPC stream +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type LastDiscoveryResponse struct { + Nonce string + Resources map[string]struct{} +} + +func NewSTOWStreamState() STOWStreamState { + return STOWStreamState{ + StreamState: NewStreamState(false, map[string]string{}), + responses: make(map[string]LastDiscoveryResponse), + } +} + +type STOWStreamState struct { + StreamState + responses map[string]LastDiscoveryResponse + mu sync.RWMutex +} + +func (l *STOWStreamState) Set(key string, value LastDiscoveryResponse) { + l.mu.Lock() + defer l.mu.Unlock() + l.responses[key] = value +} + +func (l *STOWStreamState) Get(key string) (value LastDiscoveryResponse, ok bool) { + l.mu.RLock() + defer l.mu.RUnlock() + value, ok = l.responses[key] + return +} From e0c74fbcf0781ca533c408d3964ebb219f1e512d Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 13 Jan 2022 23:54:05 +0800 Subject: [PATCH 05/11] server: test the deadlock between LinearCache and SOTW server (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 63 ++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 02078bc3ad..9438682041 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -18,8 +18,11 @@ import ( "context" "errors" "fmt" + "math/rand" "reflect" + "strconv" "sync" + "sync/atomic" "testing" "time" @@ -682,3 +685,63 @@ func TestCallbackError(t *testing.T) { }) } } + +func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { + for _, typ := range testTypes { + t.Run(typ, func(t *testing.T) { + t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + + nonce := int64(0) + ver, targetVer := uint64(0), uint64(100000) + untilVerExceed := func(exceed uint64, fn func(current uint64)) { + for current := atomic.LoadUint64(&ver); current < exceed; current = atomic.LoadUint64(&ver) { + fn(current) + } + } + + config := cache.NewLinearCache(typ) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + + resources := make([]string, 10) + for i := range resources { + resources[i] = strconv.Itoa(i) + } + + mStream := makeMockStream(t) + + go func() { + untilVerExceed(targetVer, func(current uint64) { + mStream.recv <- &discovery.DiscoveryRequest{ + Node: node, + TypeUrl: typ, + ResourceNames: resources, + VersionInfo: strconv.FormatUint(current, 10), + ResponseNonce: strconv.FormatInt(atomic.LoadInt64(&nonce), 10), + } + }) + close(mStream.recv) + }() + + go func() { + untilVerExceed(targetVer, func(current uint64) { + config.SetResources(map[string]types.Resource{ + resources[rand.Intn(len(resources))]: opaque, //nolint + }) + }) + }() + + go func() { + for resp := range mStream.sent { + v, _ := strconv.ParseUint(resp.VersionInfo, 10, 64) + atomic.StoreUint64(&ver, v) + n, _ := strconv.ParseInt(resp.Nonce, 10, 64) + atomic.StoreInt64(&nonce, n) + } + }() + + err := s.StreamAggregatedResources(mStream) + assert.Nil(t, err) + close(mStream.sent) + }) + } +} From 6a2832937b0dcc2ec6265795f0b1f7b59adacbb9 Mon Sep 17 00:00:00 2001 From: Rueian Date: Fri, 21 Jan 2022 00:12:32 +0800 Subject: [PATCH 06/11] server: reword comment of the deadlock test (#530) Signed-off-by: Rueian --- pkg/server/v3/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 9438682041..665b954e4a 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -689,7 +689,7 @@ func TestCallbackError(t *testing.T) { func TestSOTWLinearCacheIntegrationDeadLock(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { - t.Log("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked") + t.Log("Integrating LinearCache with SOTW server. If this is never completed, it might be because they are dead locked.") nonce := int64(0) ver, targetVer := uint64(0), uint64(100000) From 65e49402cb7002b94133ab8dcd049bf8b1720749 Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Mon, 28 Feb 2022 12:09:10 +0800 Subject: [PATCH 07/11] linear: keep streamState to respond resources Signed-off-by: Fu-Sheng --- pkg/cache/v3/linear.go | 46 +++++++++++++++++++++++++++++-------- pkg/cache/v3/linear_test.go | 7 +++++- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index ae55105d59..074c41d639 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -27,7 +27,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) -type watches = map[chan Response]struct{} +type watches = map[chan Response]stream.StreamState // LinearCache supports collections of opaque resources. This cache has a // single collection indexed by resource names and manages resource versions @@ -114,12 +114,17 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { } func (cache *LinearCache) respond(value chan Response, staleResources []string) { - var resources []types.ResourceWithTTL + var ( + resources []types.ResourceWithTTL + respondResourceNames []string + ) + // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for _, resource := range cache.resources { + for name, resource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + respondResourceNames = append(respondResourceNames, name) } } else { resources = make([]types.ResourceWithTTL, 0, len(staleResources)) @@ -127,11 +132,12 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string) resource := cache.resources[name] if resource != nil { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + respondResourceNames = append(respondResourceNames, name) } } } value <- &RawResponse{ - Request: &Request{TypeUrl: cache.typeURL}, + Request: &Request{TypeUrl: cache.typeURL, ResourceNames: respondResourceNames}, Resources: resources, Version: cache.getVersion(), Ctx: context.Background(), @@ -142,11 +148,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { // de-duplicate watches that need to be responded notifyList := make(map[chan Response][]string) for name := range modified { - for watch := range cache.watches[name] { - notifyList[watch] = append(notifyList[watch], name) + for watch, streamState := range cache.watches[name] { + resourceNames := streamState.GetKnownResourceNames(cache.typeURL) + modifiedNameInResourceName := false + for resourceName := range resourceNames { + if !modifiedNameInResourceName && resourceName == name { + modifiedNameInResourceName = true + } + // To avoid the stale in notifyList becomes empty slice. + // Don't skip resource name that has been deleted here. + // It would be filtered out in respond because the corresponding resource has been deleted. + notifyList[watch] = append(notifyList[watch], resourceName) + } + if !modifiedNameInResourceName { + notifyList[watch] = append(notifyList[watch], name) + } } delete(cache.watches, name) } + for value, stale := range notifyList { cache.respond(value, stale) } @@ -328,10 +348,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea stale = lastVersion != cache.version } else { for _, name := range request.ResourceNames { + _, has := streamState.GetKnownResourceNames(request.TypeUrl)[name] + version, exists := cache.versionVector[name] + // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastVersion < cache.versionVector[name] { + if lastVersion < version || (!has && exists) { stale = true - staleResources = append(staleResources, name) + + // Here we collect all requested names. + // It would be filtered out in respond if the resource name doesn't appear in cache. + staleResources = request.ResourceNames } } } @@ -341,7 +367,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea } // Create open watches since versions are up to date. if len(request.ResourceNames) == 0 { - cache.watchAll[value] = struct{}{} + cache.watchAll[value] = streamState return func() { cache.mu.Lock() defer cache.mu.Unlock() @@ -354,7 +380,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea set = make(watches) cache.watches[name] = set } - set[value] = struct{}{} + set[value] = streamState } return func() { cache.mu.Lock() diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 617d90366e..cd822fb0a2 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -274,6 +274,7 @@ func TestLinearSetResources(t *testing.T) { // Create new resources w1 := make(chan Response, 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) mustBlock(t, w1) w2 := make(chan Response, 1) @@ -341,6 +342,7 @@ func TestLinearVersionPrefix(t *testing.T) { c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "instance1-1", 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -350,6 +352,7 @@ func TestLinearDeletion(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -369,6 +372,7 @@ func TestLinearWatchTwo(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a", "b"}) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) w1 := make(chan Response, 1) @@ -376,7 +380,7 @@ func TestLinearWatchTwo(t *testing.T) { mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource - verifyResponse(t, w, "1", 1) + verifyResponse(t, w, "1", 2) verifyResponse(t, w1, "1", 2) } @@ -394,6 +398,7 @@ func TestLinearCancel(t *testing.T) { checkWatchCount(t, c, "a", 0) // cancel watch for "a" + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) From 342cb6fb54255ee63d657e44726fd93b5a05da7d Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Sat, 5 Mar 2022 10:17:01 +0800 Subject: [PATCH 08/11] Kick CI Signed-off-by: Fu-Sheng From 54bb6cd1697eedf791a2879275dfe84b44b099db Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Sun, 4 Sep 2022 16:48:24 +0800 Subject: [PATCH 09/11] linear: fix test cases Signed-off-by: Fu-Sheng --- pkg/cache/v3/linear_test.go | 5 +++-- pkg/server/stream/v3/stream.go | 12 ++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index cd822fb0a2..a7656fac50 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -742,6 +742,7 @@ func TestLinearMixedWatches(t *testing.T) { assert.Equal(t, 2, c.NumResources()) sotwState := stream.NewStreamState(false, nil) + sotwState.SetKnownResourceNamesAsList(testType, []string{"a", "b"}) w := make(chan Response, 1) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) mustBlock(t, w) @@ -754,7 +755,7 @@ func TestLinearMixedWatches(t *testing.T) { err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) assert.NoError(t, err) // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation - verifyResponse(t, w, c.getVersion(), 1) + verifyResponse(t, w, c.getVersion(), 2) checkVersionMapNotSet(t, c) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) @@ -775,6 +776,6 @@ func TestLinearMixedWatches(t *testing.T) { assert.NoError(t, err) checkVersionMapSet(t, c) - verifyResponse(t, w, c.getVersion(), 0) + verifyResponse(t, w, c.getVersion(), 1) verifyDeltaResponse(t, wd, nil, []string{"b"}) } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 093dd46d7d..a73083c7d5 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -41,6 +41,8 @@ type StreamState struct { // nolint:golint,revive // indicates whether the object has been modified since its creation first bool + + mu *sync.RWMutex } // GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to @@ -95,10 +97,16 @@ func (s *StreamState) IsWildcard() bool { } func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { + s.mu.Lock() + defer s.mu.Unlock() + s.knownResourceNames[url] = names } func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { + s.mu.Lock() + defer s.mu.Unlock() + m := map[string]struct{}{} for _, name := range names { m[name] = struct{}{} @@ -107,6 +115,9 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { } func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { + s.mu.Lock() + defer s.mu.Unlock() + return s.knownResourceNames[url] } @@ -118,6 +129,7 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St resourceVersions: initialResourceVersions, first: true, knownResourceNames: map[string]map[string]struct{}{}, + mu: &sync.RWMutex{}, } if initialResourceVersions == nil { From 8b4033fbf089380fc911fe0013f5256c1c41eee0 Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Sun, 4 Sep 2022 19:20:33 +0800 Subject: [PATCH 10/11] linear: use RLock when reading Signed-off-by: Fu-Sheng --- pkg/server/stream/v3/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index a73083c7d5..4c1dc48b8d 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -115,8 +115,8 @@ func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { } func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() return s.knownResourceNames[url] } From 391a809bb3ed360c47b040376db2cda95caadd72 Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Sat, 10 Sep 2022 11:31:11 +0800 Subject: [PATCH 11/11] linear: fix deps Signed-off-by: Fu-Sheng --- go.sum | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/go.sum b/go.sum index 6e8a95bb11..30e87b0112 100644 --- a/go.sum +++ b/go.sum @@ -253,6 +253,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -283,6 +284,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 h1:uCLL3g5wH2xjxVREVuAbP9JM5PPKjRbXKRa6IBjkzmU= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -292,6 +294,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -340,6 +343,7 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -394,6 +398,7 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 h1:HOL66YCI20JvN2hVk6o2YIp9i/3RvzVUz82PqNr7fXw= google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -411,6 +416,7 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -425,12 +431,15 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=