@@ -41,7 +41,7 @@ type changeListener struct {
4141 FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc)
4242 counter uint64 // Event counter; increments on every doc update
4343 _terminateCheckCounter uint64 // Termination Event counter; increments on every notifyCheckForTermination
44- keyCounts map [string ]uint64 // Latest count at which each doc key was updated
44+ keyCounts map [channels. ID ]uint64 // Latest count at which each doc key was updated
4545 OnChangeCallback DocChangedFunc
4646 terminator chan bool // Signal to cause DCP feed to exit
4747 broadcastChangesDoneChan chan struct {} // Channel to signal that broadcast changes goroutine has terminated
@@ -53,13 +53,16 @@ type changeListener struct {
5353// unusedSeqChannelID marks the unused sequence key for the channel cache. This is a marker that is global to all collections.
5454var unusedSeqChannelID = channels .NewID (unusedSeqKey , unusedSeqCollectionID )
5555
56+ // principalDocCollectionIDForChannelID is the collection ID for construction of channel ID for principal documents (users, roles).
57+ const principalDocCollectionIDForChannelID = 0
58+
5659type DocChangedFunc func (event sgbucket.FeedEvent , docType DocumentType )
5760
5861func (listener * changeListener ) Init (name string , groupID string , db * DatabaseContext ) {
5962 listener .bucketName = name
6063 listener .counter = 1
6164 listener ._terminateCheckCounter = 0
62- listener .keyCounts = map [string ]uint64 {}
65+ listener .keyCounts = map [channels. ID ]uint64 {}
6366 listener .tapNotifier = sync .NewCond (& sync.Mutex {})
6467 listener .sgCfgPrefix = db .MetadataKeys .SGCfgPrefix (groupID )
6568 listener .metaKeys = db .MetadataKeys
@@ -181,7 +184,8 @@ func (listener *changeListener) ProcessFeedEvent(event sgbucket.FeedEvent) bool
181184 docType := listener .DocumentType (event .Key )
182185 if docType == DocTypeUser || docType == DocTypeRole {
183186 // defer to notify after callback completion
184- defer listener .notifyKey (listener .ctx , string (event .Key ))
187+ key := channels .NewID (string (event .Key ), principalDocCollectionIDForChannelID )
188+ defer listener .notifyKey (listener .ctx , key )
185189 }
186190
187191 listener .OnDocChanged (event , docType )
@@ -251,7 +255,7 @@ func (listener *changeListener) Notify(ctx context.Context, keys channels.Set) {
251255 listener .tapNotifier .L .Lock ()
252256 listener .counter ++
253257 for key := range keys {
254- listener .keyCounts [key . String () ] = listener .counter
258+ listener .keyCounts [key ] = listener .counter
255259 }
256260 base .DebugfCtx (ctx , base .KeyChanges , "Listener keys %q for %s have changed, count=%d" ,
257261 base .UD (keys ), base .MD (listener .bucketName ), listener .counter )
@@ -309,7 +313,7 @@ func tickerValForBroadcastSpeed(skippedSequencePresent bool) time.Duration {
309313}
310314
311315// Changes the counter, notifying waiting clients. Only use for a key update.
312- func (listener * changeListener ) notifyKey (ctx context.Context , key string ) {
316+ func (listener * changeListener ) notifyKey (ctx context.Context , key channels. ID ) {
313317 listener .tapNotifier .L .Lock ()
314318 listener .counter ++
315319 listener .keyCounts [key ] = listener .counter
@@ -340,7 +344,7 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k
340344}
341345
342346// Waits until either the counter, or terminateCheckCounter exceeds the given value. Returns the new counters.
343- func (listener * changeListener ) Wait (ctx context.Context , keys []string , counter uint64 , terminateCheckCounter uint64 ) (uint64 , uint64 ) {
347+ func (listener * changeListener ) Wait (ctx context.Context , keys []channels. ID , counter uint64 , terminateCheckCounter uint64 ) (uint64 , uint64 ) {
344348 listener .tapNotifier .L .Lock ()
345349 defer listener .tapNotifier .L .Unlock ()
346350 base .DebugfCtx (ctx , base .KeyChanges , "No new changes to send to change listener. Waiting for %q's count to pass %d" ,
@@ -367,13 +371,13 @@ func (listener *changeListener) Wait(ctx context.Context, keys []string, counter
367371}
368372
369373// Returns the max value of the counter for all the given keys
370- func (listener * changeListener ) CurrentCount (keys []string ) uint64 {
374+ func (listener * changeListener ) CurrentCount (keys []channels. ID ) uint64 {
371375 listener .tapNotifier .L .Lock ()
372376 defer listener .tapNotifier .L .Unlock ()
373377 return listener ._currentCount (keys )
374378}
375379
376- func (listener * changeListener ) _currentCount (keys []string ) uint64 {
380+ func (listener * changeListener ) _currentCount (keys []channels. ID ) uint64 {
377381 var max uint64 = 0
378382 for _ , key := range keys {
379383 if count := listener .keyCounts [key ]; count > max {
@@ -389,23 +393,23 @@ func (listener *changeListener) _currentCount(keys []string) uint64 {
389393// listener's counter to increment from the value at the last call.
390394type ChangeWaiter struct {
391395 listener * changeListener
392- keys []string
393- userKeys []string
396+ keys []channels. ID
397+ userKeys []channels. ID
394398 lastCounter uint64
395399 lastTerminateCheckCounter uint64
396400 lastUserCount uint64
397401 trackUnusedSequences bool // track unused sequences in Wait functions
398402}
399403
400404// NewWaiter a new ChangeWaiter that will wait for changes for the given document keys, and will optionally track unused sequences.
401- func (listener * changeListener ) NewWaiter (keys []string , trackUnusedSequences bool ) * ChangeWaiter {
405+ func (listener * changeListener ) NewWaiter (keys []channels. ID , trackUnusedSequences bool ) * ChangeWaiter {
402406 listener .tapNotifier .L .Lock ()
403407 defer listener .tapNotifier .L .Unlock ()
404408 return listener ._newWaiter (keys , trackUnusedSequences )
405409}
406410
407411// _newWaiter a new ChangeWaiter that will wait for changes for the given document keys, and will optionally track unused sequences.
408- func (listener * changeListener ) _newWaiter (keys []string , trackUnusedSequences bool ) * ChangeWaiter {
412+ func (listener * changeListener ) _newWaiter (keys []channels. ID , trackUnusedSequences bool ) * ChangeWaiter {
409413 return & ChangeWaiter {
410414 listener : listener ,
411415 keys : keys ,
@@ -417,15 +421,16 @@ func (listener *changeListener) _newWaiter(keys []string, trackUnusedSequences b
417421
418422// NewWaiterWithChannels creates ChangeWaiter for a given channel and user, and will optionally track unused sequences.
419423func (listener * changeListener ) NewWaiterWithChannels (chans channels.Set , user auth.User , trackUnusedSequences bool ) * ChangeWaiter {
420- waitKeys := make ([]string , 0 , 5 )
424+ waitKeys := make ([]channels. ID , 0 , 5 )
421425 for channel := range chans {
422- waitKeys = append (waitKeys , channel . String () )
426+ waitKeys = append (waitKeys , channel )
423427 }
424- var userKeys []string
428+ var userKeys []channels. ID
425429 if user != nil {
426- userKeys = []string {listener .metaKeys .UserKey (user .Name ())}
430+ usrID := channels .NewID (listener .metaKeys .UserKey (user .Name ()), principalDocCollectionIDForChannelID )
431+ userKeys = []channels.ID {usrID }
427432 for role := range user .RoleNames () {
428- userKeys = append (userKeys , listener .metaKeys .RoleKey (role ))
433+ userKeys = append (userKeys , channels . NewID ( listener .metaKeys .RoleKey (role ), principalDocCollectionIDForChannelID ))
429434 }
430435 waitKeys = append (waitKeys , userKeys ... )
431436 }
@@ -480,12 +485,12 @@ func (waiter *ChangeWaiter) RefreshUserCount() bool {
480485func (waiter * ChangeWaiter ) UpdateChannels (collectionID uint32 , timedSet channels.TimedSet ) {
481486 // This capacity is not right can not accommodate channels without iteration.
482487 initialCapacity := len (waiter .userKeys )
483- updatedKeys := make ([]string , 0 , initialCapacity )
488+ updatedKeys := make ([]channels. ID , 0 , initialCapacity )
484489 for channelName , _ := range timedSet {
485- updatedKeys = append (updatedKeys , channels .NewID (channelName , collectionID ). String () )
490+ updatedKeys = append (updatedKeys , channels .NewID (channelName , collectionID ))
486491 }
487492 if waiter .trackUnusedSequences {
488- updatedKeys = append (updatedKeys , unusedSeqChannelID . String () )
493+ updatedKeys = append (updatedKeys , unusedSeqChannelID )
489494 }
490495 if len (waiter .userKeys ) > 0 {
491496 updatedKeys = append (updatedKeys , waiter .userKeys ... )
@@ -505,9 +510,9 @@ func (waiter *ChangeWaiter) RefreshUserKeys(user auth.User, metaKeys *base.Metad
505510 if len (waiter .userKeys ) == 1 && len (user .RoleNames ()) == 0 {
506511 return
507512 }
508- waiter .userKeys = []string { metaKeys .UserKey (user .Name ())}
513+ waiter .userKeys = []channels. ID { channels . NewID ( metaKeys .UserKey (user .Name ()), principalDocCollectionIDForChannelID )}
509514 for role := range user .RoleNames () {
510- waiter .userKeys = append (waiter .userKeys , metaKeys .RoleKey (role ))
515+ waiter .userKeys = append (waiter .userKeys , channels . NewID ( metaKeys .RoleKey (role ), principalDocCollectionIDForChannelID ))
511516 }
512517 waiter .lastUserCount = waiter .listener .CurrentCount (waiter .userKeys )
513518
0 commit comments