diff --git a/base/dcp_client_test.go b/base/dcp_client_test.go index f26f674f1a..11a4cd69d6 100644 --- a/base/dcp_client_test.go +++ b/base/dcp_client_test.go @@ -12,6 +12,7 @@ import ( "bytes" "fmt" "log" + "strings" "sync" "sync/atomic" "testing" @@ -940,3 +941,116 @@ func TestDCPClientAgentConfig(t *testing.T) { }) } } + +func TestDCPCheckpointCleanup(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + allDataStoreNames, err := bucket.ListDataStores() + require.NoError(t, err) + + var dataStores []sgbucket.DataStore + for _, dsName := range allDataStoreNames { + ds, err := bucket.NamedDataStore(dsName) + require.NoError(t, err) + dataStores = append(dataStores, ds) + } + + // create callback + var mutationCount uint64 + foundDocs := make(chan string, len(dataStores)) + callback := func(event sgbucket.FeedEvent) bool { + if strings.HasSuffix(string(event.Key), "_doc") { + atomic.AddUint64(&mutationCount, 1) + select { + case foundDocs <- string(event.Key): + default: + } + } + return true // request checkpoint persistence + } + + checkpointPrefix := DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()) + dcpOptions := DCPClientOptions{ + FeedID: "testfeed", + CollectionNames: NewCollectionNameSet(allDataStoreNames...), + OneShot: false, + CheckpointPrefix: checkpointPrefix, + Callback: callback, + MetadataStoreType: DCPMetadataStoreCS, + } + + dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) + require.NoError(t, err) + + // If it's a GoCBDCPClient, we want to speed up checkpointing for the test + if dc, ok := dcpClient.(*GoCBDCPClient); ok { + dc.checkpointPersistFrequency = Ptr(0 * time.Second) + } + + doneChan, startErr := dcpClient.Start() + require.NoError(t, startErr) + + defer func() { + _ = dcpClient.Close() + RequireChanClosed(t, doneChan) + }() + + // write document to each collection + for _, ds := range dataStores { + docID := fmt.Sprintf("%s_%s_%s_doc", t.Name(), ds.ScopeName(), ds.CollectionName()) + body := map[string]any{"foo": "bar"} + err = ds.Set(docID, 0, nil, body) + require.NoError(t, err) + } + + // wait for docs to be seen + for i := 0; i < len(dataStores); i++ { + select { + case <-foundDocs: + case <-time.After(10 * time.Second): + t.Fatalf("timeout waiting for docs") + } + } + + // Verify the name of the checkpoint prefix matches + actualPrefix := dcpClient.GetMetadataKeyPrefix() + require.Equal(t, checkpointPrefix, actualPrefix) + + // Close feed + err = dcpClient.Close() + require.NoError(t, err) + RequireChanClosed(t, doneChan) + + // Verify that checkpoint documents were created in the bucket + var foundCheckpoints []string + + metadataStore := bucket.Bucket.DefaultDataStore() + if !UnitTestUrlIsWalrus() { + // Try to find at least one worker's checkpoint + for i := 0; i < DefaultNumWorkers; i++ { + checkpointID := fmt.Sprintf("%s%d", checkpointPrefix, i) + _, _, err := metadataStore.GetRaw(checkpointID) + if err == nil { + foundCheckpoints = append(foundCheckpoints, checkpointID) + t.Logf("Found checkpoint document: %s", checkpointID) + } + } + require.NotEmpty(t, foundCheckpoints, "No checkpoint document found in bucket with prefix: %s", checkpointPrefix) + } else { + _, _, err := metadataStore.GetRaw(checkpointPrefix) + require.NoError(t, err, "Checkpoint document not found %q", checkpointPrefix) + foundCheckpoints = append(foundCheckpoints, checkpointPrefix) + } + + // Purge checkpoints and verify they are deleted + err = dcpClient.PurgeCheckpoints() + require.NoError(t, err) + + for _, cp := range foundCheckpoints { + _, _, err := metadataStore.GetRaw(cp) + require.Error(t, err, "Expected checkpoint document %s to be deleted", cp) + RequireDocNotFoundError(t, err) + } +} diff --git a/base/dcp_common.go b/base/dcp_common.go index cd4706f0ab..cf3328736c 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -13,6 +13,7 @@ package base import ( "bytes" "context" + "errors" "expvar" "fmt" "sync" @@ -22,6 +23,18 @@ import ( "github.com/google/uuid" ) +// DCPFeedMode describes the types of DCP feed that can be run. +type DCPFeedMode string + +const ( + // DCPFeedGocb represents a single node DCP feed for a Couchbase Server bucket. + DCPFeedGocb DCPFeedMode = "gocb" + // DCPFeedRosmar represents a DCP feed for a rosmar bucket. + DCPFeedRosmar DCPFeedMode = "rosmar" + // DCPFeedSharded represents a cbgt-based DCP feed for a Couchbase Server bucket. + DCPFeedSharded DCPFeedMode = "cbgt" +) + // Number of non-checkpoint updates per vbucket required to trigger metadata persistence. Must be greater than zero to avoid // retriggering persistence solely based on checkpoint doc echo. // Based on ad-hoc testing w/ travel-sample bucket, increasing this value doesn't significantly improve performance, since under load @@ -282,3 +295,39 @@ func GenerateDcpStreamName(feedID string) (string, error) { } return feedName, nil } + +// PurgeDCPCheckpoints will purge all DCP metadata from a previous run in a bucket. If the checkpoints are not present, this +// is not an error. +func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPrefix string, feedMode DCPFeedMode) error { + numVbuckets, err := datastore.GetMaxVbno() + if err != nil { + return err + } + switch feedMode { + case DCPFeedRosmar: + err := datastore.Delete(checkpointPrefix) + if err != nil && !IsDocNotFoundError(err) { + return err + } + return nil + case DCPFeedGocb: + metadata := NewDCPMetadataCS(ctx, datastore, numVbuckets, DefaultNumWorkers, checkpointPrefix) + metadata.Purge(ctx, DefaultNumWorkers) + return nil + case DCPFeedSharded: + var errs []error + for vbNo := range numVbuckets { + checkpointID := fmt.Sprintf("%s%d", checkpointPrefix, vbNo) + err := datastore.Delete(checkpointID) + if err != nil && !IsDocNotFoundError(err) { + errs = append(errs, fmt.Errorf("error deleting checkpoint %s: %w", checkpointID, err)) + } + } + if errs != nil { + return errors.Join(errs...) + } + return nil + default: + return fmt.Errorf("Unrecognized dcp feed mode: %s", feedMode) + } +} diff --git a/base/dcp_dest.go b/base/dcp_dest.go index d285578bfb..14b55bbc9a 100644 --- a/base/dcp_dest.go +++ b/base/dcp_dest.go @@ -29,6 +29,7 @@ func init() { type SGDest interface { cbgt.Dest + cbgt.DestEx } // DCPDest implements SGDest (superset of cbgt.Dest) interface to manage updates coming from a diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 7d285002ae..27b9e5cf7c 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -22,9 +22,10 @@ import ( "sync" "time" + "errors" + "github.com/couchbase/cbgt" sgbucket "github.com/couchbase/sg-bucket" - "github.com/pkg/errors" ) const ( @@ -233,7 +234,7 @@ func createCBGTIndex(ctx context.Context, c *CbgtContext, opts ShardedDCPOptions vbNo, err := opts.Bucket.GetMaxVbno() if err != nil { - return errors.Wrapf(err, "Unable to retrieve maxVbNo for bucket %s", MD(opts.Bucket.GetName()).Redact()) + return RedactErrorf("Unable to retrieve maxVbNo for bucket %s: %w", MD(opts.Bucket.GetName()), err) } numPartitions := opts.NumPartitions @@ -308,7 +309,7 @@ func getCBGTIndexUUID(manager *cbgt.Manager, indexName string) (previousUUID str _, indexDefsMap, err := manager.GetIndexDefs(true) if err != nil { - return "", errors.Wrapf(err, "Error calling CBGT GetIndexDefs() on index: %s", indexName) + return "", fmt.Errorf("Error calling CBGT GetIndexDefs() on index: %s: %w", indexName, err) } indexDef, ok := indexDefsMap[indexName] diff --git a/base/dcp_sharded_test.go b/base/dcp_sharded_test.go index aae73249e7..0f9c333200 100644 --- a/base/dcp_sharded_test.go +++ b/base/dcp_sharded_test.go @@ -11,6 +11,7 @@ package base import ( "context" "errors" + "expvar" "fmt" "os" "sync" @@ -19,6 +20,7 @@ import ( "time" "github.com/couchbase/cbgt" + sgbucket "github.com/couchbase/sg-bucket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1562,3 +1564,211 @@ func TestCfgNodePollerDistributed(t *testing.T) { }) } + +type mockHeartbeater struct{} + +func (m *mockHeartbeater) RegisterListener(listener HeartbeatListener) error { return nil } +func (m *mockHeartbeater) UnregisterListener(name string) {} +func (m *mockHeartbeater) Start(context.Context) error { return nil } +func (m *mockHeartbeater) StartSendingHeartbeats(context.Context) error { return nil } +func (m *mockHeartbeater) StartCheckingHeartbeats(context.Context) error { return nil } +func (m *mockHeartbeater) Stop(context.Context) {} + +type mockPIndexImpl struct { + dest cbgt.Dest +} + +func (m *mockPIndexImpl) Close() error { return nil } + +func TestShardedDCPCheckpointCleanup(t *testing.T) { + if UnitTestUrlIsWalrus() { + t.Skip("Test requires Couchbase Server bucket") + } + + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + // In test environments, the bucket pool might not always pre-create collections. + // But ListDataStores will return whatever is available. + allDataStoreNames, err := bucket.ListDataStores() + require.NoError(t, err) + + var dataStores []sgbucket.DataStore + for _, dsName := range allDataStoreNames { + ds, err := bucket.NamedDataStore(dsName) + require.NoError(t, err) + dataStores = append(dataStores, ds) + } + + // Setup a callback and a way to wait for docs + var mutationCount uint64 + doneDocs := make(chan struct{}) + expectedDocs := len(dataStores) + + var foundDocsLock sync.Mutex + type foundDoc struct { + dsID string + docID string + } + var foundDocs []foundDoc + + callback := func(event sgbucket.FeedEvent) bool { + if event.Opcode == sgbucket.FeedOpMutation { + foundDocsLock.Lock() + foundDocs = append(foundDocs, foundDoc{ + dsID: fmt.Sprintf("%d", event.CollectionID), // Not exact since we don't have ds mapping, but enough to know we found it. + docID: string(event.Key), + }) + foundDocsLock.Unlock() + + newCount := atomic.AddUint64(&mutationCount, 1) + if newCount == uint64(expectedDocs) { + select { + case <-doneDocs: + default: + close(doneDocs) + } + } + } + return true // Request checkpoint persistence + } + + // Convert allDataStoreNames to CollectionNames + collections := make(CollectionNames) + for _, ds := range allDataStoreNames { + collections.Add(ds) + } + + // Register DestFactory + scopeName, collectionList := ShardedDCPOptions{Collections: collections}.scopeAndCollections() + destKey := DestKey(t.Name(), scopeName, collectionList, ShardedDCPFeedTypeImport) + checkpointPrefix := "test_sharded_cleanup_cp" + + maxVbno, err := bucket.GetMaxVbno() + require.NoError(t, err) + + StoreDestFactory(ctx, destKey, func(rollback func()) (cbgt.Dest, error) { + return NewDCPDest( + ctx, + callback, + bucket.GetMetadataStore(), + maxVbno, + true, // persistCheckpoints + expvar.NewMap(t.Name()+"_stats"), + nil, // partitionStat + checkpointPrefix, + ) + }) + defer RemoveDestFactory(destKey) + + // Register PIndexImplType + indexType := "sharded_cleanup_index_" + t.Name() + cbgt.RegisterPIndexImplType(indexType, &cbgt.PIndexImplType{ + New: func(indexType, indexParams, path string, restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { + var outerParams struct { + Params string `json:"params"` + } + err := JSONUnmarshal([]byte(indexParams), &outerParams) + if err != nil { + return nil, nil, err + } + + var sgParams SGFeedIndexParams + err = JSONUnmarshal([]byte(outerParams.Params), &sgParams) + if err != nil { + return nil, nil, err + } + + destFactory, err := FetchDestFactory(sgParams.DestKey) + if err != nil { + return nil, nil, err + } + dest, err := destFactory(restart) + return &mockPIndexImpl{dest: dest}, dest, err + }, + }) + + // Start Sharded DCP Feed + cfg, _ := NewCbgtCfgMem() + heartbeater := &mockHeartbeater{} + + indexName, err := GenerateCBGTIndexName(t.Name(), ShardedDCPFeedTypeImport) + require.NoError(t, err) + + opts := ShardedDCPOptions{ + Bucket: bucket, + Cfg: cfg, + DBName: t.Name(), + UUID: "testUUID", + IndexName: indexName, + IndexType: indexType, + DestKey: destKey, + Heartbeater: heartbeater, + NumPartitions: 1, + FeedType: ShardedDCPFeedTypeImport, + Collections: collections, + Datastore: bucket.GetMetadataStore(), + } + + cbgtContext, err := StartShardedDCPFeed(ctx, opts) + require.NoError(t, err) + defer cbgtContext.Stop(ctx) + + // Write one doc per collection + for i, ds := range dataStores { + docID := fmt.Sprintf("doc_%d", i) + err := ds.Set(docID, 0, nil, map[string]any{"foo": "bar"}) + require.NoError(t, err) + } + + // Wait for docs + select { + case <-doneDocs: + case <-time.After(30 * time.Second): + t.Fatalf("Timed out waiting for docs") + } + + // Stop feed + cbgtContext.Stop(ctx) + + // Verify checkpoints exist + foundAny := false + for vb := uint16(0); vb < maxVbno; vb++ { + cpID := fmt.Sprintf("%s%d", checkpointPrefix, vb) + _, _, err := bucket.GetMetadataStore().GetRaw(cpID) + if err == nil { + foundAny = true + break + } + } + require.True(t, foundAny, "Should have found at least one checkpoint") + + // Purge + err = PurgeDCPCheckpoints(ctx, bucket.GetMetadataStore(), checkpointPrefix, DCPFeedSharded) + require.NoError(t, err) + + // Verify gone + for vb := uint16(0); vb < maxVbno; vb++ { + cpID := fmt.Sprintf("%s%d", checkpointPrefix, vb) + _, _, err := bucket.GetMetadataStore().GetRaw(cpID) + require.True(t, IsDocNotFoundError(err), "Checkpoint %s should have been purged, but got error: %v", cpID, err) + } + + // Delete all written docs by name and ensure they are deleted + foundDocsLock.Lock() + defer foundDocsLock.Unlock() + for _, doc := range foundDocs { + // Because we don't have a reliable mapping from collectionID back to the specific DataStore in this simple loop, + // we can attempt deletion across all dataStores for simplicity, or we can just iterate dataStores and delete by the known names (doc_0, doc_1, etc.) + // But since we want to delete the *found* documents by name: + deleted := false + for _, ds := range dataStores { + err := ds.Delete(doc.docID) + if err == nil { + deleted = true + } + } + require.True(t, deleted, "Failed to delete found document: %s", doc.docID) + } +} diff --git a/base/rosmar_dcp_client.go b/base/rosmar_dcp_client.go index 6e498ac11b..ddf1c3e851 100644 --- a/base/rosmar_dcp_client.go +++ b/base/rosmar_dcp_client.go @@ -97,8 +97,7 @@ func (dc *RosmarDCPClient) metadataStore() sgbucket.DataStore { // PurgeCheckpoints deletes the checkpoint document for the feed. Calling this function while the feed is running // will not alter the feed nor remove the checkpoint for the future. func (dc *RosmarDCPClient) PurgeCheckpoints() error { - checkpoint := dc.opts.CheckpointPrefix + ":" + dc.opts.FeedID - err := dc.metadataStore().Delete(checkpoint) + err := dc.metadataStore().Delete(dc.opts.CheckpointPrefix) if err != nil && !IsDocNotFoundError(err) { return err } diff --git a/db/background_mgr_attachment_compaction.go b/db/background_mgr_attachment_compaction.go index becbde55bd..1f0d392c0c 100644 --- a/db/background_mgr_attachment_compaction.go +++ b/db/background_mgr_attachment_compaction.go @@ -190,13 +190,23 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return nil } +// purgeCheckpoints removes the checkpoints for a specific checkpointPrefix and feed name. +func (*AttachmentCompactionManager) purgeCheckpoints(ctx context.Context, database *Database, checkpointPrefix string) error { + return base.PurgeDCPCheckpoints( + ctx, + database.MetadataStore, + checkpointPrefix, + database.dcpFeedMode(), + ) +} + func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase attachmentCompactionPhase, keyPrefix string) (bool, error) { var rollbackErr gocbcore.DCPRollbackError if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) { base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase) // to rollback any phase for attachment compaction we need to purge all persisted dcp metadata base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID) - err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID) + err = a.purgeCheckpoints(ctx, database, keyPrefix) if err != nil { base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err) return false, err diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 9780f893be..8f2dbe1a40 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -162,7 +162,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string if err != nil { return err } - dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback) + dcpOptions := a.getDCPClientOptions(a.MigrationID, scopes, callback) // check for mismatch in collection id's between current collections on the db and prev run @@ -286,21 +286,26 @@ func (a *AttachmentMigrationManager) GetProcessStatus(status BackgroundManagerSt return statusJSON, metaJSON, err } +// getCheckpointPrefix returns the checkpoint prefix for attachment migration checkpoints. +func (a *AttachmentMigrationManager) getCheckpointPrefix(migrationID string) string { + return fmt.Sprintf("%s:sg-%v:att_migration:%v", + a.databaseCtx.MetadataKeys.DCPCheckpointPrefix(a.databaseCtx.Options.GroupID), + base.ProductAPIVersion, + migrationID, + ) +} + // getMigrationDCPClientOptions returns options for DCP client for attachment migration. CollectionIDs represent the Couchbase Server // CollectionIDs and prefix represents the checkpoint prefix for checkpoint documents. -func getMigrationDCPClientOptions(db *DatabaseContext, migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { +func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { return base.DCPClientOptions{ FeedID: fmt.Sprintf("att_migration:%v", migrationID), OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, CollectionNames: scopes, - CheckpointPrefix: fmt.Sprintf("%s:sg-%v:att_migration:%v", - db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID), - base.ProductAPIVersion, - migrationID, - ), - Callback: callback, + CheckpointPrefix: a.getCheckpointPrefix(migrationID), + Callback: callback, } } @@ -321,6 +326,16 @@ type AttachmentMigrationManagerStatusDoc struct { AttachmentMigrationMeta `json:"meta"` } +// purgeCheckpoints will remove the checkpoints for a specific migration ID. +func (a *AttachmentMigrationManager) purgeCheckpoints(ctx context.Context, db *DatabaseContext, migrationID string) error { + return base.PurgeDCPCheckpoints( + ctx, + db.MetadataStore, + a.getCheckpointPrefix(migrationID), + db.dcpFeedMode(), + ) +} + // resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) error { // if we are on our first run, no collections will be defined on the manager yet @@ -329,7 +344,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex } if len(a.CollectionIDs) != len(collectionIDs) { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) + err := a.purgeCheckpoints(ctx, database, a.MigrationID) if err != nil { return err } @@ -340,10 +355,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs) if purgeNeeded != 0 { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) - if err != nil { - return err - } + return a.purgeCheckpoints(ctx, database, a.MigrationID) } return nil } diff --git a/db/background_mgr_attachment_migration_test.go b/db/background_mgr_attachment_migration_test.go index 09db21a5d8..1db42ebd4f 100644 --- a/db/background_mgr_attachment_migration_test.go +++ b/db/background_mgr_attachment_migration_test.go @@ -296,8 +296,9 @@ func TestAttachmentMigrationCheckpointPrefix(t *testing.T) { ) require.NoError(t, err) defer db.Close(ctx) - clientOptions := getMigrationDCPClientOptions( - db, + + mgr := NewAttachmentMigrationManager(db) + clientOptions := mgr.Process.(*AttachmentMigrationManager).getDCPClientOptions( migrationID, db.collectionNameSet(), func(sgbucket.FeedEvent) bool { diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index b251ec2818..46bc975647 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -103,6 +103,13 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } + if statusDoc.ResyncID != "" { + err := r.purgeCheckpoints(ctx, db, statusDoc.ResyncID) + if err != nil { + base.WarnfCtx(ctx, "Failed to delete checkpoints for previous resync ID %q: %v, these will be abandoned and unused", statusDoc.ResyncID, err) + } + } + newID, err := uuid.NewRandom() if err != nil { return err @@ -112,6 +119,16 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } +// purgeCheckpoints removes checkpoints for a given resync run. +func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error { + return base.PurgeDCPCheckpoints( + ctx, + db.MetadataStore, + GetResyncDCPCheckpointPrefix(db.DatabaseContext, resyncID, r.Distributed), + db.distributedDCPFeedMode(), + ) +} + // SetVBUUIDs updates vbuuids in the manager. func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) { r.lock.Lock() @@ -282,7 +299,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers defer resyncCbgtContext.Stop(ctx) } else { - clientOptions := getResyncDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false) + clientOptions := r.getDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false) var err error dcpClient, err = base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions) if err != nil { @@ -507,7 +524,7 @@ func initializePrincipalDocsIndex(ctx context.Context, db *Database) error { // getResyncDCPClientOptions returns the default set of DCPClientOptions suitable for resync. collectionIDs // represent Couchbase Server collection IDs. prefix represents the prefixed name of the checkpoint documents // used to store DCP checkpoints. -func getResyncDCPClientOptions(db *DatabaseContext, resyncID string, collectionNames base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc, distributed bool) base.DCPClientOptions { +func (r *ResyncManagerDCP) getDCPClientOptions(db *DatabaseContext, resyncID string, collectionNames base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc, distributed bool) base.DCPClientOptions { return base.DCPClientOptions{ FeedID: fmt.Sprintf("resync:%v", resyncID), OneShot: true, diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index c4a5c90e1b..4cff49643d 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -738,7 +738,7 @@ func TestResyncCheckpointPrefix(t *testing.T) { ) require.NoError(t, err) defer db.Close(ctx) - clientOptions := getResyncDCPClientOptions( + clientOptions := db.ResyncManager.Process.(*ResyncManagerDCP).getDCPClientOptions( db, resyncID, test.collectionNames, diff --git a/db/database.go b/db/database.go index d12bba1813..6749fa2ffa 100644 --- a/db/database.go +++ b/db/database.go @@ -2510,29 +2510,6 @@ func (db *DatabaseContext) GetCollectionIDs() []uint32 { return maps.Keys(db.CollectionByID) } -// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 -func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string) error { - - bucket, err := base.AsGocbV2Bucket(database.Bucket) - if err != nil { - checkpoint := checkpointPrefix + ":" + feedPrefix - err := database.MetadataStore.Delete(checkpoint) - if err != nil && !base.IsDocNotFoundError(err) { - return err - } - return nil - } - numVbuckets, err := bucket.GetMaxVbno() - if err != nil { - return err - } - - datastore := database.MetadataStore - metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, checkpointPrefix) - metadata.Purge(ctx, base.DefaultNumWorkers) - return nil -} - func (db *DatabaseContext) EnableAllowConflicts(tb testing.TB) { db.Options.AllowConflicts = base.Ptr(true) } @@ -2591,3 +2568,22 @@ func (db *DatabaseContext) usingRosmar() bool { func (db *DatabaseContext) WaitForSequenceNotSkipped(ctx context.Context, targetSequence uint64) error { return db.changeCache.waitForSequenceNotSkipped(ctx, targetSequence, defaultWaitForSequence) } + +// dcpFeedMode describes the dcpFeedMode for the given backing store if the DCP feed is always single node. +func (db *DatabaseContext) dcpFeedMode() base.DCPFeedMode { + if db.usingRosmar() { + return base.DCPFeedRosmar + } + return base.DCPFeedGocb +} + +// dcpFeedMode describes the dcpFeedMode for the given backing store if sharded DCP feeds are supported. +func (db *DatabaseContext) distributedDCPFeedMode() base.DCPFeedMode { + if db.usingRosmar() { + return base.DCPFeedRosmar + } + if db.useShardedDCP() { + return base.DCPFeedSharded + } + return base.DCPFeedGocb +}