-
Notifications
You must be signed in to change notification settings - Fork 140
CBG-4249: Create an abstract DCPClient to be able to work with rosmar #7879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces an abstract DCPClient interface to support both Couchbase Server (via gocbcore) and Rosmar backends. The key change is creating a unified DCP client abstraction that dispatches to implementation-specific clients based on the underlying bucket type.
Key changes:
- Created
DCPClientinterface with implementationsGoCBDCPClient(renamed fromDCPClient) andRosmarDCPClient - Unified
DCPClientOptionsstruct replacing separate options for each implementation - Removed test skips for Rosmar/Walrus, enabling DCP-based tests to run against all bucket types
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| base/abstract_dcp_client.go | New abstraction layer with DCPClient interface and factory function NewDCPClient |
| base/rosmar_dcp_client.go | New Rosmar-specific DCP client implementation |
| base/dcp_client.go | Renamed DCPClient to GoCBDCPClient and DCPClientOptions to GoCBDCPClientOptions |
| base/dcp_client_stream_observer.go | Updated receiver types from DCPClient to GoCBDCPClient |
| base/gocb_dcp_feed.go | Refactored to use new client creation pattern |
| db/background_mgr_resync_dcp.go | Updated to use new abstract client with scope-based collection specification |
| db/background_mgr_attachment_migration.go | Updated to use new abstract client with scope-based collection specification |
| db/attachment_compaction.go | Updated to use new abstract client with scope-based collection specification |
| db/util_testing.go | Updated to use new abstract client with scope-based collection specification |
| db/background_mgr_resync_dcp_test.go | Removed Walrus test skips |
| db/background_mgr_attachment_migration_test.go | Removed Walrus test skips |
| db/attachment_compaction_test.go | Removed Walrus test skips |
| base/dcp_client_test.go | Removed Walrus test skips and commented out unported tests |
| tools/cache_perf_tool/dcpDataGeneration.go | Updated type references to GoCBDCPClient |
ec497ab to
8aa5326
Compare
8aa5326 to
dc06fb0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 33 out of 33 changed files in this pull request and generated 9 comments.
adamcfraser
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few initial comments, haven't done a full review.
| InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName)) | ||
| go func() { | ||
| select { | ||
| case err := <-doneChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that for a one-shot feed, a closed doneChan wasn't an error case.
| // Close is used externally to stop the DCP client. | ||
| func (dc *GoCBDCPClient) Close() { | ||
| dc.close() | ||
| return dc.getCloseError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not care about reporting any potential close error, or is that being done elsewhere?
| var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set") | ||
|
|
||
| type DCPClient struct { | ||
| type GoCBDCPClient struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick, take it or leave it, but I think GocbDCPClient and GocbDCPClientOptions are more readable.
| bucket, err := base.AsGocbV2Bucket(database.Bucket) | ||
| if err != nil { | ||
| return err | ||
| checkpoint := checkpointPrefix + ":" + feedPrefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this Rosmar-specific handling - it seems to require some knowledge of the naming convention for rosmar DCP checkpoints. Seems like an unexpected place for rosmar-specific code.
bbrks
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping the few comments I had from yesterday's review
| FailOnRollback: false, | ||
| CollectionIDs: slices.Collect(maps.Keys(collections)), | ||
| MetadataStoreType: base.DCPMetadataStoreInMemory, | ||
| collectionNames.Add(dataStoreName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Add takes a variadic and Add itself is doing a reasonable amount of work to handle the de-dupe, it might make more sense to push this down outside of the loop?
| for _, d := range ds { | ||
| if _, ok := c[d.ScopeName()]; !ok { | ||
| c[d.ScopeName()] = []string{} | ||
| } else if slices.Contains(c[d.ScopeName()], d.CollectionName()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like the slices.Contains inside the ds iteration - especially thinking about in the future when we're considering increasing the collection limits.
It kind of feels like this would be better suited as a map[string]map[string]struct{} or similar to avoid the slice iterations at insertion time, and handle dedupe.
|
|
||
| func getResyncDCPPrefix(resyncID string) string { | ||
| return fmt.Sprintf( | ||
| "sg-%v:resync:%v", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should decide on whether CBG-4286 means we should make this prefix sg: or one that includes a version. I would be inclined to say sg: and then group version numbers in the trailing part of the name.
Create an abstract DCPClient to be able to work with rosmar
I expect that this can be extended to work as a sharded or non shared DCP client as well, but I think there's interesting work here.
I made this "work" but not for attachment compaction since this uses hierarchical paths in rosmar for xattr subdoc operations https://jira.issues.couchbase.com/browse/CBG-4232
TODO
Pre-review checklist
fmt.Print,log.Print, ...)base.UD(docID),base.MD(dbName))docs/apiDependencies (if applicable)
Integration Tests
GSI=true,xattrs=truehttps://jenkins.sgwdev.com/job/SyncGatewayIntegration/0000/