@@ -57,6 +57,8 @@ const (
5757 // Limit is required to avoid memory spikes during cache initialization.
5858 // The default limit of 50 is chosen based on experiments.
5959 defaultListSemaphoreWeight = 50
60+ // defaultEventProcessingInterval is the default interval for processing events
61+ defaultEventProcessingInterval = 100 * time .Millisecond
6062)
6163
6264const (
@@ -75,6 +77,11 @@ type apiMeta struct {
7577 watchCancel context.CancelFunc
7678}
7779
80+ type eventMeta struct {
81+ event watch.EventType
82+ un * unstructured.Unstructured
83+ }
84+
7885// ClusterInfo holds cluster cache stats
7986type ClusterInfo struct {
8087 // Server holds cluster API server URL
@@ -96,6 +103,9 @@ type ClusterInfo struct {
96103// OnEventHandler is a function that handles Kubernetes event
97104type OnEventHandler func (event watch.EventType , un * unstructured.Unstructured )
98105
106+ // OnProcessEventsHandler handles process events event
107+ type OnProcessEventsHandler func (duration time.Duration , processedEventsNumber int )
108+
99109// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
100110type OnPopulateResourceInfoHandler func (un * unstructured.Unstructured , isRoot bool ) (info interface {}, cacheManifest bool )
101111
@@ -137,6 +147,8 @@ type ClusterCache interface {
137147 OnResourceUpdated (handler OnResourceUpdatedHandler ) Unsubscribe
138148 // OnEvent register event handler that is executed every time when new K8S event received
139149 OnEvent (handler OnEventHandler ) Unsubscribe
150+ // OnProcessEventsHandler register event handler that is executed every time when events were processed
151+ OnProcessEventsHandler (handler OnProcessEventsHandler ) Unsubscribe
140152}
141153
142154type WeightedSemaphore interface {
@@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
153165 cache := & clusterCache {
154166 settings : Settings {ResourceHealthOverride : & noopSettings {}, ResourcesFilter : & noopSettings {}},
155167 apisMeta : make (map [schema.GroupKind ]* apiMeta ),
168+ eventMetaCh : nil ,
156169 listPageSize : defaultListPageSize ,
157170 listPageBufferSize : defaultListPageBufferSize ,
158171 listSemaphore : semaphore .NewWeighted (defaultListSemaphoreWeight ),
@@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
169182 },
170183 watchResyncTimeout : defaultWatchResyncTimeout ,
171184 clusterSyncRetryTimeout : ClusterRetryTimeout ,
185+ eventProcessingInterval : defaultEventProcessingInterval ,
172186 resourceUpdatedHandlers : map [uint64 ]OnResourceUpdatedHandler {},
173187 eventHandlers : map [uint64 ]OnEventHandler {},
188+ processEventsHandlers : map [uint64 ]OnProcessEventsHandler {},
174189 log : log ,
175190 listRetryLimit : 1 ,
176191 listRetryUseBackoff : false ,
@@ -185,16 +200,20 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
185200type clusterCache struct {
186201 syncStatus clusterCacheSync
187202
188- apisMeta map [schema.GroupKind ]* apiMeta
189- serverVersion string
190- apiResources []kube.APIResourceInfo
203+ apisMeta map [schema.GroupKind ]* apiMeta
204+ batchEventsProcessing bool
205+ eventMetaCh chan eventMeta
206+ serverVersion string
207+ apiResources []kube.APIResourceInfo
191208 // namespacedResources is a simple map which indicates a groupKind is namespaced
192209 namespacedResources map [schema.GroupKind ]bool
193210
194211 // maximum time we allow watches to run before relisting the group/kind and restarting the watch
195212 watchResyncTimeout time.Duration
196213 // sync retry timeout for cluster when sync error happens
197214 clusterSyncRetryTimeout time.Duration
215+ // ticker interval for events processing
216+ eventProcessingInterval time.Duration
198217
199218 // size of a page for list operations pager.
200219 listPageSize int64
@@ -224,6 +243,7 @@ type clusterCache struct {
224243 populateResourceInfoHandler OnPopulateResourceInfoHandler
225244 resourceUpdatedHandlers map [uint64 ]OnResourceUpdatedHandler
226245 eventHandlers map [uint64 ]OnEventHandler
246+ processEventsHandlers map [uint64 ]OnProcessEventsHandler
227247 openAPISchema openapi.Resources
228248 gvkParser * managedfields.GvkParser
229249
@@ -299,6 +319,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
299319 return handlers
300320}
301321
322+ // OnProcessEventsHandler register event handler that is executed every time when events were processed
323+ func (c * clusterCache ) OnProcessEventsHandler (handler OnProcessEventsHandler ) Unsubscribe {
324+ c .handlersLock .Lock ()
325+ defer c .handlersLock .Unlock ()
326+ key := c .handlerKey
327+ c .handlerKey ++
328+ c .processEventsHandlers [key ] = handler
329+ return func () {
330+ c .handlersLock .Lock ()
331+ defer c .handlersLock .Unlock ()
332+ delete (c .processEventsHandlers , key )
333+ }
334+ }
335+ func (c * clusterCache ) getProcessEventsHandlers () []OnProcessEventsHandler {
336+ c .handlersLock .Lock ()
337+ defer c .handlersLock .Unlock ()
338+ handlers := make ([]OnProcessEventsHandler , 0 , len (c .processEventsHandlers ))
339+ for _ , h := range c .processEventsHandlers {
340+ handlers = append (handlers , h )
341+ }
342+ return handlers
343+ }
344+
302345// GetServerVersion returns observed cluster version
303346func (c * clusterCache ) GetServerVersion () string {
304347 return c .serverVersion
@@ -440,6 +483,10 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
440483 for i := range opts {
441484 opts [i ](c )
442485 }
486+
487+ if c .batchEventsProcessing {
488+ c .invalidateEventMeta ()
489+ }
443490 c .apisMeta = nil
444491 c .namespacedResources = nil
445492 c .log .Info ("Invalidated cluster" )
@@ -669,7 +716,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
669716 return fmt .Errorf ("Failed to convert to *unstructured.Unstructured: %v" , event .Object )
670717 }
671718
672- c .processEvent (event .Type , obj )
719+ c .recordEvent (event .Type , obj )
673720 if kube .IsCRD (obj ) {
674721 var resources []kube.APIResourceInfo
675722 crd := v1.CustomResourceDefinition {}
@@ -823,6 +870,12 @@ func (c *clusterCache) sync() error {
823870 for i := range c .apisMeta {
824871 c .apisMeta [i ].watchCancel ()
825872 }
873+
874+ if c .batchEventsProcessing {
875+ c .invalidateEventMeta ()
876+ c .eventMetaCh = make (chan eventMeta )
877+ }
878+
826879 c .apisMeta = make (map [schema.GroupKind ]* apiMeta )
827880 c .resources = make (map [kube.ResourceKey ]* Resource )
828881 c .namespacedResources = make (map [schema.GroupKind ]bool )
@@ -864,6 +917,10 @@ func (c *clusterCache) sync() error {
864917 return err
865918 }
866919
920+ if c .batchEventsProcessing {
921+ go c .processEvents ()
922+ }
923+
867924 // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
868925 lock := sync.Mutex {}
869926 err = kube .RunAllAsync (len (apis ), func (i int ) error {
@@ -926,6 +983,14 @@ func (c *clusterCache) sync() error {
926983 return nil
927984}
928985
986+ // invalidateEventMeta closes the eventMeta channel if it is open
987+ func (c * clusterCache ) invalidateEventMeta () {
988+ if c .eventMetaCh != nil {
989+ close (c .eventMetaCh )
990+ c .eventMetaCh = nil
991+ }
992+ }
993+
929994// EnsureSynced checks cache state and synchronizes it if necessary
930995func (c * clusterCache ) EnsureSynced () error {
931996 syncStatus := & c .syncStatus
@@ -1231,7 +1296,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
12311296 return managedObjs , nil
12321297}
12331298
1234- func (c * clusterCache ) processEvent (event watch.EventType , un * unstructured.Unstructured ) {
1299+ func (c * clusterCache ) recordEvent (event watch.EventType , un * unstructured.Unstructured ) {
12351300 for _ , h := range c .getEventHandlers () {
12361301 h (event , un )
12371302 }
@@ -1240,15 +1305,74 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
12401305 return
12411306 }
12421307
1308+ if c .batchEventsProcessing {
1309+ c .eventMetaCh <- eventMeta {event , un }
1310+ } else {
1311+ c .lock .Lock ()
1312+ defer c .lock .Unlock ()
1313+ c .processEvent (key , eventMeta {event , un })
1314+ }
1315+ }
1316+
1317+ func (c * clusterCache ) processEvents () {
1318+ log := c .log .WithValues ("functionName" , "processItems" )
1319+ log .V (1 ).Info ("Start processing events" )
1320+
12431321 c .lock .Lock ()
1244- defer c .lock .Unlock ()
1322+ ch := c .eventMetaCh
1323+ c .lock .Unlock ()
1324+
1325+ eventMetas := make ([]eventMeta , 0 )
1326+ ticker := time .NewTicker (c .eventProcessingInterval )
1327+ defer ticker .Stop ()
1328+
1329+ for {
1330+ select {
1331+ case evMeta , ok := <- ch :
1332+ if ! ok {
1333+ log .V (2 ).Info ("Event processing channel closed, finish processing" )
1334+ return
1335+ }
1336+ eventMetas = append (eventMetas , evMeta )
1337+ case <- ticker .C :
1338+ if len (eventMetas ) > 0 {
1339+ c .processEventsBatch (eventMetas )
1340+ eventMetas = eventMetas [:0 ]
1341+ }
1342+ }
1343+ }
1344+ }
1345+
1346+ func (c * clusterCache ) processEventsBatch (eventMetas []eventMeta ) {
1347+ log := c .log .WithValues ("functionName" , "processEventsBatch" )
1348+ start := time .Now ()
1349+ c .lock .Lock ()
1350+ log .V (1 ).Info ("Lock acquired (ms)" , "duration" , time .Since (start ).Milliseconds ())
1351+ defer func () {
1352+ c .lock .Unlock ()
1353+ duration := time .Since (start )
1354+ // Update the metric with the duration of the events processing
1355+ for _ , handler := range c .getProcessEventsHandlers () {
1356+ handler (duration , len (eventMetas ))
1357+ }
1358+ }()
1359+
1360+ for _ , evMeta := range eventMetas {
1361+ key := kube .GetResourceKey (evMeta .un )
1362+ c .processEvent (key , evMeta )
1363+ }
1364+
1365+ log .V (1 ).Info ("Processed events (ms)" , "count" , len (eventMetas ), "duration" , time .Since (start ).Milliseconds ())
1366+ }
1367+
1368+ func (c * clusterCache ) processEvent (key kube.ResourceKey , evMeta eventMeta ) {
12451369 existingNode , exists := c .resources [key ]
1246- if event == watch .Deleted {
1370+ if evMeta . event == watch .Deleted {
12471371 if exists {
12481372 c .onNodeRemoved (key )
12491373 }
1250- } else if event != watch . Deleted {
1251- c .onNodeUpdated (existingNode , c .newResource (un ))
1374+ } else {
1375+ c .onNodeUpdated (existingNode , c .newResource (evMeta . un ))
12521376 }
12531377}
12541378
0 commit comments