Skip to content

Commit dc06fb0

Browse files
committed
Unskip tests
1 parent e83d779 commit dc06fb0

14 files changed

+54
-113
lines changed

base/dcp_client_test.go

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,6 @@ func (dc *GoCBDCPClient) forceRollbackvBucket(uuid gocbcore.VbUUID) {
424424

425425
// TestResumeInterruptedFeed uses persisted metadata to resume the feed
426426
func TestResumeStoppedFeed(t *testing.T) {
427-
//SetUpTestLogging(t, LevelDebug, KeyAll)
428427
ctx := TestCtx(t)
429428
bucket := GetTestBucket(t)
430429
defer bucket.Close(ctx)
@@ -628,36 +627,14 @@ func TestDCPOutOfRangeSequence(t *testing.T) {
628627

629628
}
630629

631-
func getCollectionIDs(t *testing.T, bucket *TestBucket) []uint32 {
632-
collection, err := AsCollection(bucket.GetSingleDataStore())
633-
require.NoError(t, err)
634-
635-
var collectionIDs []uint32
636-
if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) {
637-
collectionIDs = append(collectionIDs, collection.GetCollectionID())
638-
}
639-
return collectionIDs
640-
641-
}
642-
643630
func TestDCPFeedEventTypes(t *testing.T) {
644-
TestRequiresGocbDCPClient(t)
645-
646631
ctx := TestCtx(t)
647632
bucket := GetTestBucket(t)
648633
defer bucket.Close(ctx)
649634

650635
collection := bucket.GetSingleDataStore()
651636

652637
collectionNames := CollectionNames{collection.ScopeName(): []string{collection.CollectionName()}}
653-
// start one shot feed
654-
var collectionIDs []uint32
655-
if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) {
656-
collectionIDs = append(collectionIDs, collection.GetCollectionID())
657-
}
658-
659-
gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
660-
require.NoError(t, err)
661638

662639
foundEvent := make(chan struct{})
663640
docID := t.Name()
@@ -696,7 +673,7 @@ func TestDCPFeedEventTypes(t *testing.T) {
696673
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
697674
Callback: callback,
698675
}
699-
dcpClient, err := NewDCPClient(ctx, gocbv2Bucket, clientOptions)
676+
dcpClient, err := NewDCPClient(ctx, bucket, clientOptions)
700677
require.NoError(t, err)
701678

702679
doneChan, startErr := dcpClient.Start(ctx)
@@ -738,9 +715,7 @@ func TestDCPFeedEventTypes(t *testing.T) {
738715
}
739716

740717
func TestDCPClientAgentConfig(t *testing.T) {
741-
if UnitTestUrlIsWalrus() {
742-
t.Skip("exercises gocbcore code")
743-
}
718+
TestRequiresGocbDCPClient(t)
744719
ctx := TestCtx(t)
745720
bucket := GetTestBucket(t)
746721
defer bucket.Close(ctx)

base/dcp_sharded.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ type CbgtContext struct {
6464
// StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket.
6565
// dbName is used to define a unique path name for local file storage of pindex files
6666
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) {
67-
fmt.Printf("cfg=%+v\n", cfg)
6867
// Ensure we don't try to start collections-enabled feed if there are any pre-collection SG nodes in the cluster.
6968
minVersion, err := getMinNodeVersion(cfg)
7069
if err != nil {

base/util_testing.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -758,24 +758,10 @@ func TestRequiresCollections(t testing.TB) {
758758
}
759759
}
760760

761-
// TestRequiresOneShotDCPClient will skip the current test until rosmar supports one-shot DCP.
762-
func TestRequiresOneShotDCPClient(t testing.TB) {
763-
if UnitTestUrlIsWalrus() {
764-
t.Skip("rosmar doesn't have an abstracted one shot DCP client CBG-4246")
765-
}
766-
}
767-
768-
// TestRequiresDCPResync will skip the current test DCP sync is not supported.
769-
func TestRequiresDCPResync(t testing.TB) {
770-
if UnitTestUrlIsWalrus() {
771-
t.Skip("Walrus doesn't support DCP resync CBG-2661/CBG-4246")
772-
}
773-
}
774-
775761
// TestRequiresGocbDCPClient will skip the current test if using rosmar.
776762
func TestRequiresGocbDCPClient(t testing.TB) {
777763
if UnitTestUrlIsWalrus() {
778-
t.Skip("rosmar doesn't support base.DCPClient")
764+
t.Skip("rosmar doesn't support GocbDCPClient")
779765
}
780766
}
781767

db/attachment_compaction.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const (
2929
CleanupPhase = "cleanup"
3030
)
3131

32-
func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, terminator *base.SafeTerminator, markedAttachmentCount *base.AtomicInt) (count int64, vbUUIDs []uint64, checkpointPrefix string, err error) {
32+
func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, terminator *base.SafeTerminator, markedAttachmentCount *base.AtomicInt) (count int64, vbUUIDs []uint64, checkpointPrefix string, feedPrefix string, err error) {
3333
base.InfofCtx(ctx, base.KeyAll, "Starting first phase of attachment compaction (mark phase) with compactionID: %q", compactionID)
3434
compactionLoggingID := "Compaction Mark: " + compactionID
3535

@@ -138,43 +138,43 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c
138138
dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions)
139139
if err != nil {
140140
base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err)
141-
return 0, nil, "", err
141+
return 0, nil, "", "", err
142142
}
143143
metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix()
144144

145145
doneChan, err := dcpClient.Start(ctx)
146146
if err != nil {
147147
base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err)
148148
dcpClient.Close()
149-
return 0, nil, metadataKeyPrefix, err
149+
return 0, nil, metadataKeyPrefix, clientOptions.FeedPrefix, err
150150
}
151151
base.DebugfCtx(ctx, base.KeyAll, "[%s] DCP feed started.", compactionLoggingID)
152152

153153
select {
154154
case <-doneChan:
155155
base.InfofCtx(ctx, base.KeyAll, "[%s] Mark phase of attachment compaction completed. Marked %d attachments", compactionLoggingID, markedAttachmentCount.Value())
156156
if markProcessFailureErr != nil {
157-
return markedAttachmentCount.Value(), nil, metadataKeyPrefix, markProcessFailureErr
157+
return markedAttachmentCount.Value(), nil, metadataKeyPrefix, clientOptions.FeedPrefix, markProcessFailureErr
158158
}
159159
case <-terminator.Done():
160160
base.DebugfCtx(ctx, base.KeyAll, "[%s] Terminator closed. Stopping mark phase.", compactionLoggingID)
161161
dcpClient.Close()
162162
if markProcessFailureErr != nil {
163-
return markedAttachmentCount.Value(), nil, metadataKeyPrefix, markProcessFailureErr
163+
return markedAttachmentCount.Value(), nil, metadataKeyPrefix, clientOptions.FeedPrefix, markProcessFailureErr
164164
}
165165
if err != nil {
166-
return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, err
166+
return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, clientOptions.FeedPrefix, err
167167
}
168168

169169
err = <-doneChan
170170
if err != nil {
171-
return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, err
171+
return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, clientOptions.FeedPrefix, err
172172
}
173173

174174
base.InfofCtx(ctx, base.KeyAll, "[%s] Mark phase of attachment compaction was terminated. Marked %d attachments", compactionLoggingID, markedAttachmentCount.Value())
175175
}
176176

177-
return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, err
177+
return markedAttachmentCount.Value(), base.GetVBUUIDs(dcpClient.GetMetadata()), metadataKeyPrefix, clientOptions.FeedPrefix, err
178178
}
179179

180180
// AttachmentsMetaMap struct is a very minimal struct to unmarshal into when getting attachments from bodies
@@ -406,7 +406,7 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore,
406406
return purgedAttachmentCount.Value(), err
407407
}
408408

409-
func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (string, error) {
409+
func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (checkpointPrefix string, feedPrefix string, err error) {
410410
base.InfofCtx(ctx, base.KeyAll, "Starting third phase of attachment compaction (cleanup phase) with compactionID: %q", compactionID)
411411
compactionLoggingID := "Compaction Cleanup: " + compactionID
412412

@@ -495,23 +495,18 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
495495

496496
base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for cleanup phase of attachment compaction", compactionLoggingID)
497497

498-
bucket, err := base.AsGocbV2Bucket(db.Bucket)
499-
if err != nil {
500-
return "", err
501-
}
502-
503-
dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions)
498+
dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions)
504499
if err != nil {
505500
base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err)
506-
return "", err
501+
return "", "", err
507502
}
508503
metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix()
509504

510505
doneChan, err := dcpClient.Start(ctx)
511506
if err != nil {
512507
base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err)
513508
dcpClient.Close()
514-
return metadataKeyPrefix, err
509+
return metadataKeyPrefix, clientOptions.FeedPrefix, err
515510
}
516511

517512
select {
@@ -521,13 +516,13 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
521516
dcpClient.Close()
522517
err = <-doneChan
523518
if err != nil {
524-
return metadataKeyPrefix, err
519+
return metadataKeyPrefix, clientOptions.FeedPrefix, err
525520
}
526521

527522
base.InfofCtx(ctx, base.KeyAll, "[%s] Cleanup phase of attachment compaction was terminated", compactionLoggingID)
528523
}
529524

530-
return metadataKeyPrefix, err
525+
return metadataKeyPrefix, clientOptions.FeedPrefix, err
531526
}
532527

533528
// getCompactionIDSubDocPath is just a tiny helper func that just concatenates the subdoc path we're using to store

db/attachment_compaction_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
func testRequiresRosmarHierachicalSubdocOps(t *testing.T) {
28-
t.Skip("hierachical subdoc operations are not supported by rosmar yet - CBG-4232")
28+
t.Skip("hierarchical subdoc operations are not supported by rosmar yet - CBG-4232")
2929
}
3030

3131
func TestAttachmentMark(t *testing.T) {
@@ -62,7 +62,7 @@ func TestAttachmentMark(t *testing.T) {
6262
attKeys = append(attKeys, createDocWithInBodyAttachment(t, ctx, "inBodyDoc", []byte(`{}`), "attForInBodyRef", []byte(`{"val": "inBodyAtt"}`), databaseCollection))
6363

6464
terminator := base.NewSafeTerminator()
65-
attachmentsMarked, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{})
65+
attachmentsMarked, _, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{})
6666
assert.NoError(t, err)
6767
assert.Equal(t, int64(13), attachmentsMarked)
6868

@@ -202,7 +202,7 @@ func TestAttachmentCleanup(t *testing.T) {
202202
}
203203

204204
terminator := base.NewSafeTerminator()
205-
_, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator)
205+
_, _, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator)
206206
assert.NoError(t, err)
207207

208208
for _, docID := range singleMarkedAttIDs {
@@ -357,7 +357,7 @@ func TestAttachmentMarkAndSweepAndCleanup(t *testing.T) {
357357
}
358358

359359
terminator := base.NewSafeTerminator()
360-
attachmentsMarked, vbUUIDS, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{})
360+
attachmentsMarked, vbUUIDS, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, t.Name(), terminator, &base.AtomicInt{})
361361
assert.NoError(t, err)
362362
assert.Equal(t, int64(10), attachmentsMarked)
363363

@@ -382,7 +382,7 @@ func TestAttachmentMarkAndSweepAndCleanup(t *testing.T) {
382382
}
383383
}
384384

385-
_, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator)
385+
_, _, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator)
386386
assert.NoError(t, err)
387387

388388
for _, attDocKey := range attKeys {
@@ -696,7 +696,7 @@ func TestAttachmentDifferentVBUUIDsBetweenPhases(t *testing.T) {
696696

697697
// Run mark phase as usual
698698
terminator := base.NewSafeTerminator()
699-
_, vbUUIDs, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDB, t.Name(), terminator, &base.AtomicInt{})
699+
_, vbUUIDs, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDB, t.Name(), terminator, &base.AtomicInt{})
700700
assert.NoError(t, err)
701701

702702
// Manually modify a vbUUID and ensure the Sweep phase errors
@@ -975,7 +975,7 @@ func TestAttachmentCompactIncorrectStat(t *testing.T) {
975975
stat := &base.AtomicInt{}
976976
count := int64(0)
977977
go func() {
978-
attachmentCount, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, "mark", terminator, stat)
978+
attachmentCount, _, _, _, err := attachmentCompactMarkPhase(ctx, dataStore, collectionID, testDb, "mark", terminator, stat)
979979
atomic.StoreInt64(&count, attachmentCount)
980980
require.NoError(t, err)
981981
}()

db/background_mgr_attachment_compaction.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,10 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin
133133
a.SetPhase("mark")
134134
worker := func() (shouldRetry bool, err error, value any) {
135135
persistClusterStatus()
136-
_, a.VBUUIDs, metadataKeyPrefix, err = attachmentCompactMarkPhase(ctx, dataStore, collectionID, database, a.CompactID, terminator, &a.MarkedAttachments)
136+
var feedPrefix string
137+
_, a.VBUUIDs, metadataKeyPrefix, feedPrefix, err = attachmentCompactMarkPhase(ctx, dataStore, collectionID, database, a.CompactID, terminator, &a.MarkedAttachments)
137138
if err != nil {
138-
shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, metadataKeyPrefix)
139+
shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, metadataKeyPrefix, feedPrefix)
139140
}
140141
return shouldRetry, err, nil
141142
}
@@ -161,9 +162,10 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin
161162
a.SetPhase("cleanup")
162163
worker := func() (shouldRetry bool, err error, value any) {
163164
persistClusterStatus()
164-
metadataKeyPrefix, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator)
165+
var feedPrefix string
166+
metadataKeyPrefix, feedPrefix, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator)
165167
if err != nil {
166-
shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, metadataKeyPrefix)
168+
shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, metadataKeyPrefix, feedPrefix)
167169
}
168170
return shouldRetry, err, nil
169171
}
@@ -182,13 +184,13 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin
182184
return nil
183185
}
184186

185-
func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase, keyPrefix string) (bool, error) {
187+
func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase, checkpointPrefix, feedPrefix string) (bool, error) {
186188
var rollbackErr gocbcore.DCPRollbackError
187189
if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) {
188190
base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase)
189191
// to rollback any phase for attachment compaction we need to purge all persisted dcp metadata
190192
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID)
191-
err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID)
193+
err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, checkpointPrefix, feedPrefix)
192194
if err != nil {
193195
base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err)
194196
return false, err

db/background_mgr_attachment_migration.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string
165165
dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback)
166166

167167
// check for mismatch in collection id's between current collections on the db and prev run
168-
err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, currCollectionIDs)
168+
err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, dcpOptions.FeedPrefix, currCollectionIDs)
169169
if err != nil {
170170
return err
171171
}
@@ -315,15 +315,15 @@ func getAttachmentMigrationPrefix(migrationID string) string {
315315
}
316316

317317
// resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run
318-
func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) error {
318+
func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string, collectionIDs []uint32) error {
319319
// if we are on our first run, no collections will be defined on the manager yet
320320
if len(a.CollectionIDs) == 0 {
321321
return nil
322322
}
323323
if len(a.CollectionIDs) != len(collectionIDs) {
324324
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID)
325-
err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID)
326-
if err != nil {
325+
err := PurgeDCPCheckpoints(ctx, database, checkpointPrefix, feedPrefix)
326+
if err != nil && !base.IsDocNotFoundError(err) {
327327
return err
328328
}
329329
return nil
@@ -333,7 +333,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex
333333
purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs)
334334
if purgeNeeded != 0 {
335335
base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID)
336-
err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID)
336+
err := PurgeDCPCheckpoints(ctx, database, checkpointPrefix, feedPrefix)
337337
if err != nil {
338338
return err
339339
}

db/database.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2466,7 +2466,7 @@ func (db *DatabaseContext) StartOnlineProcesses(ctx context.Context) (returnedEr
24662466

24672467
db.AttachmentMigrationManager = NewAttachmentMigrationManager(db)
24682468
// if we have collections requiring migration, run the job
2469-
if len(db.RequireAttachmentMigration) > 0 && !db.BucketSpec.IsWalrusBucket() {
2469+
if len(db.RequireAttachmentMigration) > 0 {
24702470
err := db.AttachmentMigrationManager.Start(ctx, nil)
24712471
if err != nil {
24722472
base.WarnfCtx(ctx, "Error trying to migrate attachments for %s with error: %v", db.Name, err)
@@ -2572,11 +2572,21 @@ func (db *DatabaseContext) GetCollectionIDs() []uint32 {
25722572
}
25732573

25742574
// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0
2575-
func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, taskID string) error {
2575+
func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string) error {
25762576

25772577
bucket, err := base.AsGocbV2Bucket(database.Bucket)
25782578
if err != nil {
2579-
return err
2579+
checkpoint := checkpointPrefix + ":" + feedPrefix
2580+
fmt.Printf("Deleting DCP checkpoint %q\n", checkpoint)
2581+
err := database.MetadataStore.Delete(checkpoint)
2582+
if err != nil && !base.IsDocNotFoundError(err) {
2583+
return err
2584+
}
2585+
if base.IsDocNotFoundError(err) {
2586+
fmt.Printf("No DCP checkpoint found %q\n", checkpoint)
2587+
return nil
2588+
}
2589+
return nil
25802590
}
25812591
numVbuckets, err := bucket.GetMaxVbno()
25822592
if err != nil {

rest/access_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,6 @@ func TestAllDocsAccessControl(t *testing.T) {
760760
}
761761

762762
func TestChannelAccessChanges(t *testing.T) {
763-
base.TestRequiresDCPResync(t)
764763
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCache, base.KeyChanges, base.KeyCRUD)
765764
rtConfig := RestTesterConfig{SyncFn: `function(doc) {access(doc.owner, doc._id);channel(doc.channel)}`, PersistentConfig: true}
766765
rt := NewRestTester(t, &rtConfig)

0 commit comments

Comments
 (0)