@@ -29,7 +29,7 @@ const (
2929 CleanupPhase = "cleanup"
3030)
3131
32- func attachmentCompactMarkPhase (ctx context.Context , dataStore base.DataStore , collectionID uint32 , db * Database , compactionID string , terminator * base.SafeTerminator , markedAttachmentCount * base.AtomicInt ) (count int64 , vbUUIDs []uint64 , checkpointPrefix string , err error ) {
32+ func attachmentCompactMarkPhase (ctx context.Context , dataStore base.DataStore , collectionID uint32 , db * Database , compactionID string , terminator * base.SafeTerminator , markedAttachmentCount * base.AtomicInt ) (count int64 , vbUUIDs []uint64 , checkpointPrefix string , feedPrefix string , err error ) {
3333 base .InfofCtx (ctx , base .KeyAll , "Starting first phase of attachment compaction (mark phase) with compactionID: %q" , compactionID )
3434 compactionLoggingID := "Compaction Mark: " + compactionID
3535
@@ -138,43 +138,43 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c
138138 dcpClient , err := base .NewDCPClient (ctx , db .Bucket , clientOptions )
139139 if err != nil {
140140 base .WarnfCtx (ctx , "[%s] Failed to create attachment compaction DCP client! %v" , compactionLoggingID , err )
141- return 0 , nil , "" , err
141+ return 0 , nil , "" , "" , err
142142 }
143143 metadataKeyPrefix := dcpClient .GetMetadataKeyPrefix ()
144144
145145 doneChan , err := dcpClient .Start (ctx )
146146 if err != nil {
147147 base .WarnfCtx (ctx , "[%s] Failed to start attachment compaction DCP feed! %v" , compactionLoggingID , err )
148148 dcpClient .Close ()
149- return 0 , nil , metadataKeyPrefix , err
149+ return 0 , nil , metadataKeyPrefix , clientOptions . FeedPrefix , err
150150 }
151151 base .DebugfCtx (ctx , base .KeyAll , "[%s] DCP feed started." , compactionLoggingID )
152152
153153 select {
154154 case <- doneChan :
155155 base .InfofCtx (ctx , base .KeyAll , "[%s] Mark phase of attachment compaction completed. Marked %d attachments" , compactionLoggingID , markedAttachmentCount .Value ())
156156 if markProcessFailureErr != nil {
157- return markedAttachmentCount .Value (), nil , metadataKeyPrefix , markProcessFailureErr
157+ return markedAttachmentCount .Value (), nil , metadataKeyPrefix , clientOptions . FeedPrefix , markProcessFailureErr
158158 }
159159 case <- terminator .Done ():
160160 base .DebugfCtx (ctx , base .KeyAll , "[%s] Terminator closed. Stopping mark phase." , compactionLoggingID )
161161 dcpClient .Close ()
162162 if markProcessFailureErr != nil {
163- return markedAttachmentCount .Value (), nil , metadataKeyPrefix , markProcessFailureErr
163+ return markedAttachmentCount .Value (), nil , metadataKeyPrefix , clientOptions . FeedPrefix , markProcessFailureErr
164164 }
165165 if err != nil {
166- return markedAttachmentCount .Value (), base .GetVBUUIDs (dcpClient .GetMetadata ()), metadataKeyPrefix , err
166+ return markedAttachmentCount .Value (), base .GetVBUUIDs (dcpClient .GetMetadata ()), metadataKeyPrefix , clientOptions . FeedPrefix , err
167167 }
168168
169169 err = <- doneChan
170170 if err != nil {
171- return markedAttachmentCount .Value (), base .GetVBUUIDs (dcpClient .GetMetadata ()), metadataKeyPrefix , err
171+ return markedAttachmentCount .Value (), base .GetVBUUIDs (dcpClient .GetMetadata ()), metadataKeyPrefix , clientOptions . FeedPrefix , err
172172 }
173173
174174 base .InfofCtx (ctx , base .KeyAll , "[%s] Mark phase of attachment compaction was terminated. Marked %d attachments" , compactionLoggingID , markedAttachmentCount .Value ())
175175 }
176176
177- return markedAttachmentCount .Value (), base .GetVBUUIDs (dcpClient .GetMetadata ()), metadataKeyPrefix , err
177+ return markedAttachmentCount .Value (), base .GetVBUUIDs (dcpClient .GetMetadata ()), metadataKeyPrefix , clientOptions . FeedPrefix , err
178178}
179179
180180// AttachmentsMetaMap struct is a very minimal struct to unmarshal into when getting attachments from bodies
@@ -406,7 +406,7 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore,
406406 return purgedAttachmentCount .Value (), err
407407}
408408
409- func attachmentCompactCleanupPhase (ctx context.Context , dataStore base.DataStore , collectionID uint32 , db * Database , compactionID string , vbUUIDs []uint64 , terminator * base.SafeTerminator ) (string , error ) {
409+ func attachmentCompactCleanupPhase (ctx context.Context , dataStore base.DataStore , collectionID uint32 , db * Database , compactionID string , vbUUIDs []uint64 , terminator * base.SafeTerminator ) (checkpointPrefix string , feedPrefix string , err error ) {
410410 base .InfofCtx (ctx , base .KeyAll , "Starting third phase of attachment compaction (cleanup phase) with compactionID: %q" , compactionID )
411411 compactionLoggingID := "Compaction Cleanup: " + compactionID
412412
@@ -495,23 +495,18 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
495495
496496 base .InfofCtx (ctx , base .KeyAll , "[%s] Starting DCP feed for cleanup phase of attachment compaction" , compactionLoggingID )
497497
498- bucket , err := base .AsGocbV2Bucket (db .Bucket )
499- if err != nil {
500- return "" , err
501- }
502-
503- dcpClient , err := base .NewDCPClient (ctx , bucket , clientOptions )
498+ dcpClient , err := base .NewDCPClient (ctx , db .Bucket , clientOptions )
504499 if err != nil {
505500 base .WarnfCtx (ctx , "[%s] Failed to create attachment compaction DCP client! %v" , compactionLoggingID , err )
506- return "" , err
501+ return "" , "" , err
507502 }
508503 metadataKeyPrefix := dcpClient .GetMetadataKeyPrefix ()
509504
510505 doneChan , err := dcpClient .Start (ctx )
511506 if err != nil {
512507 base .WarnfCtx (ctx , "[%s] Failed to start attachment compaction DCP feed! %v" , compactionLoggingID , err )
513508 dcpClient .Close ()
514- return metadataKeyPrefix , err
509+ return metadataKeyPrefix , clientOptions . FeedPrefix , err
515510 }
516511
517512 select {
@@ -521,13 +516,13 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
521516 dcpClient .Close ()
522517 err = <- doneChan
523518 if err != nil {
524- return metadataKeyPrefix , err
519+ return metadataKeyPrefix , clientOptions . FeedPrefix , err
525520 }
526521
527522 base .InfofCtx (ctx , base .KeyAll , "[%s] Cleanup phase of attachment compaction was terminated" , compactionLoggingID )
528523 }
529524
530- return metadataKeyPrefix , err
525+ return metadataKeyPrefix , clientOptions . FeedPrefix , err
531526}
532527
533528// getCompactionIDSubDocPath is just a tiny helper func that just concatenates the subdoc path we're using to store
0 commit comments