@@ -27,7 +27,7 @@ import (
2727 "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2828)
2929
30- type watches = map [chan Response ]struct {}
30+ type watches = map [chan Response ]stream. StreamState
3131
3232// LinearCache supports collections of opaque resources. This cache has a
3333// single collection indexed by resource names and manages resource versions
@@ -114,24 +114,30 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
114114}
115115
116116func (cache * LinearCache ) respond (value chan Response , staleResources []string ) {
117- var resources []types.ResourceWithTTL
117+ var (
118+ resources []types.ResourceWithTTL
119+ respondResourceNames []string
120+ )
121+
118122 // TODO: optimize the resources slice creations across different clients
119123 if len (staleResources ) == 0 {
120124 resources = make ([]types.ResourceWithTTL , 0 , len (cache .resources ))
121- for _ , resource := range cache .resources {
125+ for name , resource := range cache .resources {
122126 resources = append (resources , types.ResourceWithTTL {Resource : resource })
127+ respondResourceNames = append (respondResourceNames , name )
123128 }
124129 } else {
125130 resources = make ([]types.ResourceWithTTL , 0 , len (staleResources ))
126131 for _ , name := range staleResources {
127132 resource := cache .resources [name ]
128133 if resource != nil {
129134 resources = append (resources , types.ResourceWithTTL {Resource : resource })
135+ respondResourceNames = append (respondResourceNames , name )
130136 }
131137 }
132138 }
133139 value <- & RawResponse {
134- Request : & Request {TypeUrl : cache .typeURL },
140+ Request : & Request {TypeUrl : cache .typeURL , ResourceNames : respondResourceNames },
135141 Resources : resources ,
136142 Version : cache .getVersion (),
137143 Ctx : context .Background (),
@@ -142,11 +148,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
142148 // de-duplicate watches that need to be responded
143149 notifyList := make (map [chan Response ][]string )
144150 for name := range modified {
145- for watch := range cache .watches [name ] {
146- notifyList [watch ] = append (notifyList [watch ], name )
151+ for watch , streamState := range cache .watches [name ] {
152+ resourceNames := streamState .GetKnownResourceNames (cache .typeURL )
153+ modifiedNameInResourceName := false
154+ for resourceName := range resourceNames {
155+ if ! modifiedNameInResourceName && resourceName == name {
156+ modifiedNameInResourceName = true
157+ }
158+ // To avoid the stale in notifyList becomes empty slice.
159+ // Don't skip resource name that has been deleted here.
160+ // It would be filtered out in respond because the corresponding resource has been deleted.
161+ notifyList [watch ] = append (notifyList [watch ], resourceName )
162+ }
163+ if ! modifiedNameInResourceName {
164+ notifyList [watch ] = append (notifyList [watch ], name )
165+ }
147166 }
148167 delete (cache .watches , name )
149168 }
169+
150170 for value , stale := range notifyList {
151171 cache .respond (value , stale )
152172 }
@@ -328,10 +348,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
328348 stale = lastVersion != cache .version
329349 } else {
330350 for _ , name := range request .ResourceNames {
351+ _ , has := streamState .GetKnownResourceNames (request .TypeUrl )[name ]
352+ version , exists := cache .versionVector [name ]
353+
331354 // When a resource is removed, its version defaults 0 and it is not considered stale.
332- if lastVersion < cache . versionVector [ name ] {
355+ if lastVersion < version || ( ! has && exists ) {
333356 stale = true
334- staleResources = append (staleResources , name )
357+
358+ // Here we collect all requested names.
359+ // It would be filtered out in respond if the resource name doesn't appear in cache.
360+ staleResources = request .ResourceNames
335361 }
336362 }
337363 }
@@ -341,7 +367,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
341367 }
342368 // Create open watches since versions are up to date.
343369 if len (request .ResourceNames ) == 0 {
344- cache .watchAll [value ] = struct {}{}
370+ cache .watchAll [value ] = streamState
345371 return func () {
346372 cache .mu .Lock ()
347373 defer cache .mu .Unlock ()
@@ -354,7 +380,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
354380 set = make (watches )
355381 cache .watches [name ] = set
356382 }
357- set [value ] = struct {}{}
383+ set [value ] = streamState
358384 }
359385 return func () {
360386 cache .mu .Lock ()
0 commit comments