@@ -162,29 +162,24 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string
162162 if err != nil {
163163 return err
164164 }
165- dcpFeedKey , err := GenerateAttachmentMigrationDCPStreamName ( a .MigrationID )
165+ dcpOptions , err := getMigrationDCPClientOptions ( db , a .MigrationID , scopes , callback )
166166 if err != nil {
167167 return err
168168 }
169- dcpPrefix := db .MetadataKeys .DCPCheckpointPrefix (db .Options .GroupID )
170169
171170 // check for mismatch in collection id's between current collections on the db and prev run
172- checkpointPrefix := fmt .Sprintf ("%s:%v" , dcpPrefix , dcpFeedKey )
173- err = a .resetDCPMetadataIfNeeded (ctx , db , checkpointPrefix , currCollectionIDs )
171+ err = a .resetDCPMetadataIfNeeded (ctx , db , dcpOptions .CheckpointPrefix , currCollectionIDs )
174172 if err != nil {
175173 return err
176174 }
177175
178176 a .SetCollectionIDs (currCollectionIDs )
179- dcpOptions := getMigrationDCPClientOptions (scopes , dcpPrefix )
180- dcpOptions .ID = dcpFeedKey
181- dcpOptions .Callback = callback
182177 dcpClient , err := base .NewDCPClient (ctx , db .Bucket , dcpOptions )
183178 if err != nil {
184179 base .WarnfCtx (ctx , "[%s] Failed to create attachment migration DCP client: %v" , migrationLoggingID , err )
185180 return err
186181 }
187- base .DebugfCtx (ctx , base .KeyAll , "[%s] Starting DCP feed %q for attachment migration" , migrationLoggingID , dcpFeedKey )
182+ base .DebugfCtx (ctx , base .KeyAll , "[%s] Starting DCP feed for attachment migration" , migrationLoggingID )
188183
189184 doneChan , err := dcpClient .Start (ctx )
190185 if err != nil {
@@ -283,14 +278,22 @@ func (a *AttachmentMigrationManager) GetProcessStatus(status BackgroundManagerSt
283278 return statusJSON , metaJSON , err
284279}
285280
286- func getMigrationDCPClientOptions (scopes base.CollectionNames , prefix string ) base.DCPClientOptions {
281+ func getMigrationDCPClientOptions (db * DatabaseContext , migrationID string , scopes base.CollectionNames , callback sgbucket.FeedEventCallbackFunc ) (base.DCPClientOptions , error ) {
282+ prefix := getAttachmentMigrationPrefix (migrationID )
283+
284+ feedID , err := base .GenerateDcpStreamName (prefix )
285+ if err != nil {
286+ return base.DCPClientOptions {}, err
287+ }
287288 return base.DCPClientOptions {
289+ ID : feedID ,
288290 OneShot : true ,
289291 FailOnRollback : false ,
290292 MetadataStoreType : base .DCPMetadataStoreCS ,
291293 CollectionNames : scopes ,
292- CheckpointPrefix : prefix ,
293- }
294+ CheckpointPrefix : fmt .Sprintf ("%s:%v" , db .MetadataKeys .DCPCheckpointPrefix (db .Options .GroupID ), prefix ),
295+ Callback : callback ,
296+ }, nil
294297}
295298
296299type AttachmentMigrationManagerResponse struct {
@@ -310,12 +313,12 @@ type AttachmentMigrationManagerStatusDoc struct {
310313 AttachmentMigrationMeta `json:"meta"`
311314}
312315
313- // GenerateAttachmentMigrationDCPStreamName returns the DCP stream name for attachment migration job .
314- func GenerateAttachmentMigrationDCPStreamName (migrationID string ) ( string , error ) {
315- return base . GenerateDcpStreamName ( fmt .Sprintf (
316+ // getAttachmentMigrationPrefix returns a prefix for identifying attachment migration dcp feed and checkpoints .
317+ func getAttachmentMigrationPrefix (migrationID string ) string {
318+ return fmt .Sprintf (
316319 "sg-%v:att_migration:%v" ,
317320 base .ProductAPIVersion ,
318- migrationID ))
321+ migrationID )
319322}
320323
321324// resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run
0 commit comments