From 2f70a98e8ded424211be7b5b94183ea12624fe4d Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 5 May 2026 09:55:46 -0400 Subject: [PATCH 1/9] Purge checkpoints better --- base/dcp_client.go | 3 + base/dcp_dest.go | 1 + base/dcp_sharded.go | 22 ++++- db/attachment_compaction.go | 13 ++- db/attachment_compaction_test.go | 22 ++--- db/background_mgr_attachment_compaction.go | 39 +++++++-- db/background_mgr_attachment_migration.go | 92 ++++++++++++--------- db/background_mgr_resync_dcp.go | 94 +++++++++++++++++++++- db/database.go | 23 ------ 9 files changed, 213 insertions(+), 96 deletions(-) diff --git a/base/dcp_client.go b/base/dcp_client.go index b18ded7188..4302a648e1 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -18,6 +18,9 @@ import ( "github.com/couchbaselabs/rosmar" ) +// DCPCheckpointPurgeFunc allows purging of DCP Checkpoints +type DCPCheckpointPurgeFunc func() error + // DCPClient is an interface for all DCP implementations. type DCPClient interface { // Start will start the DCP feed. It returns a channel marking the end of the feed. diff --git a/base/dcp_dest.go b/base/dcp_dest.go index d285578bfb..14b55bbc9a 100644 --- a/base/dcp_dest.go +++ b/base/dcp_dest.go @@ -29,6 +29,7 @@ func init() { type SGDest interface { cbgt.Dest + cbgt.DestEx } // DCPDest implements SGDest (superset of cbgt.Dest) interface to manage updates coming from a diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 8383a9ccc0..908dcc1d92 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -22,9 +22,10 @@ import ( "sync" "time" + "errors" + "github.com/couchbase/cbgt" sgbucket "github.com/couchbase/sg-bucket" - "github.com/pkg/errors" ) const ( @@ -233,7 +234,7 @@ func createCBGTIndex(ctx context.Context, c *CbgtContext, opts ShardedDCPOptions vbNo, err := opts.Bucket.GetMaxVbno() if err != nil { - return errors.Wrapf(err, "Unable to retrieve maxVbNo for bucket %s", MD(opts.Bucket.GetName()).Redact()) + return RedactErrorf("Unable to retrieve maxVbNo for bucket %s: %w", MD(opts.Bucket.GetName()), err) } numPartitions := opts.NumPartitions @@ -308,7 +309,7 @@ func getCBGTIndexUUID(manager *cbgt.Manager, indexName string) (previousUUID str _, indexDefsMap, err := manager.GetIndexDefs(true) if err != nil { - return "", errors.Wrapf(err, "Error calling CBGT GetIndexDefs() on index: %s", indexName) + return "", fmt.Errorf("Error calling CBGT GetIndexDefs() on index: %s: %w", indexName, err) } indexDef, ok := indexDefsMap[indexName] @@ -920,3 +921,18 @@ func (meh *sgMgrEventHandlers) OnFeedError(_ string, r cbgt.Feed, feedErr error) dcpFeed.NotifyMgrOnClose() } } + +func DeleteShardedDCPCheckpoints(ctx context.Context, datastore DataStore, vbCount uint16, checkpointPrefix string) error { + var errs []error + for vbNo := range vbCount { + checkpointID := fmt.Sprintf("%s_%d", checkpointPrefix, vbNo) + err := datastore.Delete(checkpointID) + if err != nil && !IsDocNotFoundError(err) { + errs = append(errs, fmt.Errorf("error deleting checkpoint %s: %w", checkpointID, err)) + } + } + if errs != nil { + return errors.Join(errs...) + } + return nil +} diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 2c7ff1cc38..35754b933e 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -424,7 +424,7 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, // attachmentCompactCleanupPhase runs a DCP feed to clean up all documents with an attachment compaction xattr. Returns // the DCP checkpoint prefix and any error encountered. -func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (string, error) { +func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (base.DCPClient, error) { base.InfofCtx(ctx, base.KeyAll, "Starting third phase of attachment compaction (cleanup phase) with compactionID: %q", compactionID) compactionLoggingID := "Compaction Cleanup: " + compactionID @@ -523,16 +523,15 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) - return "", err + return nil, err } - metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() doneChan, err := dcpClient.Start() if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) // simplify close in CBG-2234 _ = dcpClient.Close() - return metadataKeyPrefix, err + return dcpClient, err } select { @@ -545,18 +544,18 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore err = dcpClient.Close() if err != nil { base.WarnfCtx(ctx, "[%s] Failed to close attachment compaction DCP client! %v", compactionLoggingID, err) - return metadataKeyPrefix, err + return dcpClient, err } err = <-doneChan if err != nil { - return metadataKeyPrefix, err + return dcpClient, err } base.InfofCtx(ctx, base.KeyAll, "[%s] Cleanup phase of attachment compaction was terminated", compactionLoggingID) } - return metadataKeyPrefix, err + return dcpClient, err } // getCompactionIDSubDocPath is just a tiny helper func that just concatenates the subdoc path we're using to store diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index 3582635d80..8a724cabf0 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -203,9 +203,9 @@ func TestAttachmentCleanup(t *testing.T) { } terminator := base.NewSafeTerminator() - checkpointPrefix, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator) + _, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator) require.NoError(t, err) - require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentCleanup_cleanup", base.ProductAPIVersion), checkpointPrefix) + //require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentCleanup_cleanup", base.ProductAPIVersion), checkpointPrefix) for _, docID := range singleMarkedAttIDs { _, _, err := dataStore.GetXattrs(ctx, docID, []string{base.AttachmentCompactionXattrName}) @@ -391,9 +391,9 @@ func TestAttachmentMarkAndSweepAndCleanup(t *testing.T) { } } - checkpointPrefix, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator) + dcpClient, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator) require.NoError(t, err) - require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentMarkAndSweepAndCleanup_cleanup", base.ProductAPIVersion), checkpointPrefix) + require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentMarkAndSweepAndCleanup_cleanup", base.ProductAPIVersion), dcpClient.GetMetadataKeyPrefix()) for _, attDocKey := range attKeys { var back any @@ -499,19 +499,7 @@ func TestAttachmentCompactionRunTwice(t *testing.T) { err = testDB1.AttachmentCompactionManager.Start(ctx1, map[string]any{"database": testDB1}) assert.NoError(t, err) - err = WaitForConditionWithOptions(t, func() bool { - var status AttachmentManagerResponse - rawStatus, err := testDB1.AttachmentCompactionManager.GetStatus(ctx1) - assert.NoError(t, err) - err = base.JSONUnmarshal(rawStatus, &status) - require.NoError(t, err) - - if status.State == BackgroundProcessStateStopped { - return true - } - - return false - }, 200, 1000) + RequireBackgroundManagerState(t, testDB1.AttachmentCompactionManager, BackgroundProcessStateStopped) // Kick off another run with an attempted start from the other node, checks for error on other node triggerCallback = true diff --git a/db/background_mgr_attachment_compaction.go b/db/background_mgr_attachment_compaction.go index becbde55bd..62f70ff2bc 100644 --- a/db/background_mgr_attachment_compaction.go +++ b/db/background_mgr_attachment_compaction.go @@ -11,10 +11,12 @@ package db import ( "context" "errors" + "fmt" "sync" "time" "github.com/couchbase/gocbcore/v10" + sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" "github.com/google/uuid" ) @@ -102,6 +104,31 @@ func (a *AttachmentCompactionManager) Init(ctx context.Context, options map[stri return newRunInit() } +func (a *AttachmentCompactionManager) purgeCheckpoints(ctx context.Context, compactionID string, db *Database, dataStore sgbucket.DataStore) error { + fakeCallback := func(event sgbucket.FeedEvent) bool { return false } + var errs []error + for _, phase := range []attachmentCompactionPhase{MarkPhase, CleanupPhase} { + clientOptions := getCompactionDCPClientOptions( + db, + compactionID, + base.NewCollectionNameSet(dataStore), + phase, + fakeCallback, + ) + + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) + if err != nil { + errs = append(errs, fmt.Errorf("Could not create a dcp client phase %q in order to purge checkpoints: %w", phase, err)) + continue + } + err = dcpClient.PurgeCheckpoints() + if err != nil { + errs = append(errs, fmt.Errorf("error purging checkpoints for phase %q: %w", phase, err)) + } + } + return errors.Join(errs...) +} + func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { database := options["database"].(*Database) @@ -143,7 +170,7 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return false, err, nil } - shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, dcpClient.GetMetadataKeyPrefix()) + shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, dcpClient.PurgeCheckpoints) } return shouldRetry, err, nil } @@ -169,9 +196,9 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin a.SetPhase("cleanup") worker := func() (shouldRetry bool, err error, value any) { persistClusterStatus() - metadataKeyPrefix, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator) - if err != nil { - shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, metadataKeyPrefix) + dcpClient, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator) + if err != nil && dcpClient != nil { + shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, dcpClient.PurgeCheckpoints) } return shouldRetry, err, nil } @@ -190,13 +217,13 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return nil } -func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase attachmentCompactionPhase, keyPrefix string) (bool, error) { +func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase attachmentCompactionPhase, checkpointPurgeFunc base.DCPCheckpointPurgeFunc) (bool, error) { var rollbackErr gocbcore.DCPRollbackError if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) { base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase) // to rollback any phase for attachment compaction we need to purge all persisted dcp metadata base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID) - err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID) + err = checkpointPurgeFunc() if err != nil { base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err) return false, err diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 9780f893be..cd72f61500 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -54,6 +54,17 @@ func NewAttachmentMigrationManager(database *DatabaseContext) *BackgroundManager func (a *AttachmentMigrationManager) Init(ctx context.Context, options map[string]any, clusterStatus []byte) error { newRunInit := func() error { + if a.MigrationID != "" { + dcpClient, err := a.newDCPClient(ctx, a.databaseCtx) + if err != nil { + return fmt.Errorf("Could not create a DCP client when preparing to reset checkpoints: %w", err) + } + base.InfofCtx(ctx, base.KeyAll, "Attachment Migration: Resetting checkpoints for new migration run with migration ID: %s", a.MigrationID) + err = dcpClient.PurgeCheckpoints() + if err != nil { + return fmt.Errorf("Could not purge checkpoints when preparing for new migration run: %w", err) + } + } uniqueUUID, err := uuid.NewRandom() if err != nil { return err @@ -92,19 +103,9 @@ func (a *AttachmentMigrationManager) Init(ctx context.Context, options map[strin return newRunInit() } -func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { - db := a.databaseCtx - migrationLoggingID := "Migration: " + a.MigrationID - - persistClusterStatus := func() { - err := persistClusterStatusCallback(ctx) - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to persist latest cluster status for attachment migration: %v", migrationLoggingID, err) - } - } - defer persistClusterStatus() - - callback := func(event sgbucket.FeedEvent) bool { +func (a *AttachmentMigrationManager) getDCPCallback(ctx context.Context, db *DatabaseContext) sgbucket.FeedEventCallbackFunc { + migrationLoggingID := a.migrationLoggingID() + return func(event sgbucket.FeedEvent) bool { docID := string(event.Key) collection := db.CollectionByID[event.CollectionID] base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", migrationLoggingID, event.Opcode, base.UD(docID)) @@ -157,26 +158,56 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string a.docsChanged.Add(1) return true } +} - scopes, currCollectionIDs, err := getCollectionsForAttachmentMigration(db) +// newDCPClient creates a DCP client for the attachment migration process. +func (a *AttachmentMigrationManager) newDCPClient(ctx context.Context, db *DatabaseContext) (base.DCPClient, error) { + scopes, _, err := getCollectionsForAttachmentMigration(db) if err != nil { - return err + return nil, err } - dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback) + dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, a.getDCPCallback(ctx, db)) + return base.NewDCPClient(ctx, db.Bucket, dcpOptions) +} - // check for mismatch in collection id's between current collections on the db and prev run +func (a *AttachmentMigrationManager) migrationLoggingID() string { + return "Migration: " + a.MigrationID +} - err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, currCollectionIDs) +func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { + db := a.databaseCtx + migrationLoggingID := a.migrationLoggingID() + + persistClusterStatus := func() { + err := persistClusterStatusCallback(ctx) + if err != nil { + base.WarnfCtx(ctx, "[%s] Failed to persist latest cluster status for attachment migration: %v", migrationLoggingID, err) + } + } + defer persistClusterStatus() + + _, currCollectionIDs, err := getCollectionsForAttachmentMigration(db) if err != nil { return err } + // check for mismatch in collection id's between current collections on the db and prev run + shouldPurgeCheckpoints := a.shouldResetCheckpoints(ctx, db, getMigrationDCPClientOptions(db, a.MigrationID, nil, nil).CheckpointPrefix, currCollectionIDs) a.SetCollectionIDs(currCollectionIDs) - dcpClient, err := base.NewDCPClient(ctx, db.Bucket, dcpOptions) + + dcpClient, err := a.newDCPClient(ctx, db) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment migration DCP client: %v", migrationLoggingID, err) return err } + if shouldPurgeCheckpoints { + base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) + err := dcpClient.PurgeCheckpoints() + if err != nil { + return err + } + } + base.DebugfCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for attachment migration", migrationLoggingID) doneChan, err := dcpClient.Start() @@ -321,31 +352,18 @@ type AttachmentMigrationManagerStatusDoc struct { AttachmentMigrationMeta `json:"meta"` } -// resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run -func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) error { +// shouldResetCheckpoints returns true if the collection data does not match the previous data. +func (a *AttachmentMigrationManager) shouldResetCheckpoints(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) bool { // if we are on our first run, no collections will be defined on the manager yet if len(a.CollectionIDs) == 0 { - return nil + return false } if len(a.CollectionIDs) != len(collectionIDs) { - base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) - if err != nil { - return err - } - return nil + return true } slices.Sort(collectionIDs) slices.Sort(a.CollectionIDs) - purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs) - if purgeNeeded != 0 { - base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) - if err != nil { - return err - } - } - return nil + return slices.Compare(collectionIDs, a.CollectionIDs) != 0 } // getCollectionsForAttachmentMigration will get all datastores. diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index b251ec2818..41425e4d43 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -103,6 +103,13 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } + if statusDoc.ResyncID != "" { + err := r.purgeCheckpoints(ctx, db, statusDoc.ResyncID) + if err != nil { + base.WarnfCtx(ctx, "Failed to delete checkpoints for previous resync ID %q: %v, these will be abandoned and unused", statusDoc.ResyncID, err) + } + } + newID, err := uuid.NewRandom() if err != nil { return err @@ -112,6 +119,25 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } +func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error { + if resyncID == "" { + return errors.New("resyncID is required to delete checkpoints") + } + checkpointPrefix := GetResyncDCPCheckpointPrefix(db.DatabaseContext, resyncID, r.Distributed) + if !r.Distributed { + dcpClient, err := r.newDCPClient(ctx, db, false) + if err != nil { + return fmt.Errorf("error creating DCP client to delete checkpoints for resync ID %q: %v", resyncID, err) + } + return dcpClient.PurgeCheckpoints() + } + vbCount, err := db.Bucket.GetMaxVbno() + if err != nil { + return fmt.Errorf("error getting vb count for checkpoint deletion: %v", err) + } + return base.DeleteShardedDCPCheckpoints(ctx, db.MetadataStore, vbCount, checkpointPrefix) +} + // SetVBUUIDs updates vbuuids in the manager. func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) { r.lock.Lock() @@ -119,6 +145,70 @@ func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) { r.VBUUIDs = vbuuids } +func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, regenerateSequences bool) sgbucket.FeedEventCallbackFunc { + return func(event sgbucket.FeedEvent) bool { + docID := string(event.Key) + base.TracefCtx(ctx, base.KeyAll, "Resync: Received DCP event %d for doc %v", event.Opcode, base.UD(docID)) + + // Ignore documents without xattrs if possible, to avoid processing unnecessary documents + if r.useXattrs && event.DataType&base.MemcachedDataTypeXattr == 0 { + return true + } + // Don't want to process raw binary docs + // The binary check should suffice but for additional safety also check for empty bodies. This will also avoid + // processing tombstones. + if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 { + return true + } + + // We only want to process full docs. Not any sync docs. + if strings.HasPrefix(docID, base.SyncDocPrefix) { + return true + } + + r.DocsProcessed.Add(1) + db.DbStats.Database().ResyncNumProcessed.Add(1) + databaseCollection := db.CollectionByID[event.CollectionID] + databaseCollection.collectionStats.ResyncNumProcessed.Add(1) + ctx = databaseCollection.AddCollectionContext(ctx) + doc, err := bucketDocumentFromFeed(event) + if err != nil { + base.WarnfCtx(ctx, "Resync: Error getting document from DCP event for doc %q: %v", base.UD(docID), err) + return false + } + err = (&DatabaseCollectionWithUser{ + DatabaseCollection: databaseCollection, + }).ResyncDocument(ctx, docID, doc, regenerateSequences) + + if err == nil { + r.DocsChanged.Add(1) + db.DbStats.Database().ResyncNumChanged.Add(1) + databaseCollection.collectionStats.ResyncNumChanged.Add(1) + } else if err != base.ErrUpdateCancel { + base.WarnfCtx(ctx, "Resync: Error updating doc %q: %v", base.UD(docID), err) + return false + } + return true + } +} + +// newDCPClient creates a DCP client for resync with the appropriate options and callback. +func (r *ResyncManagerDCP) newDCPClient(ctx context.Context, db *Database, regenerateSequences bool) (base.DCPClient, error) { + clientOptions := getResyncDCPClientOptions( + db.DatabaseContext, + r.ResyncID, + r.ResyncedCollections.ToCollectionNameSet(), + r.getDCPCallback(ctx, db, regenerateSequences), + r.Distributed, + ) + dcpClient, err := base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions) + if err != nil { + base.WarnfCtx(ctx, "Failed to create resync DCP client! %v", err) + return nil, err + } + return dcpClient, nil +} + // Run starts a DCP feed to process documents for resync. func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) (err error) { db, ok := options["database"].(*Database) @@ -282,9 +372,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers defer resyncCbgtContext.Stop(ctx) } else { - clientOptions := getResyncDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false) - var err error - dcpClient, err = base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions) + dcpClient, err = r.newDCPClient(ctx, db, regenerateSequences) if err != nil { base.WarnfCtx(ctx, "Failed to create resync DCP client! %v", err) return err diff --git a/db/database.go b/db/database.go index d12bba1813..a4137d4645 100644 --- a/db/database.go +++ b/db/database.go @@ -2510,29 +2510,6 @@ func (db *DatabaseContext) GetCollectionIDs() []uint32 { return maps.Keys(db.CollectionByID) } -// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 -func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string) error { - - bucket, err := base.AsGocbV2Bucket(database.Bucket) - if err != nil { - checkpoint := checkpointPrefix + ":" + feedPrefix - err := database.MetadataStore.Delete(checkpoint) - if err != nil && !base.IsDocNotFoundError(err) { - return err - } - return nil - } - numVbuckets, err := bucket.GetMaxVbno() - if err != nil { - return err - } - - datastore := database.MetadataStore - metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, checkpointPrefix) - metadata.Purge(ctx, base.DefaultNumWorkers) - return nil -} - func (db *DatabaseContext) EnableAllowConflicts(tb testing.TB) { db.Options.AllowConflicts = base.Ptr(true) } From 54fd5888221783ead8523ff85b5385b590c94e43 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 5 May 2026 10:21:23 -0400 Subject: [PATCH 2/9] Remove attachment compaction code --- db/attachment_compaction.go | 13 ++++---- db/attachment_compaction_test.go | 22 +++++++++--- db/background_mgr_attachment_compaction.go | 39 ++++------------------ db/database.go | 23 +++++++++++++ 4 files changed, 53 insertions(+), 44 deletions(-) diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 35754b933e..2c7ff1cc38 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -424,7 +424,7 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, // attachmentCompactCleanupPhase runs a DCP feed to clean up all documents with an attachment compaction xattr. Returns // the DCP checkpoint prefix and any error encountered. -func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (base.DCPClient, error) { +func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore, collectionID uint32, db *Database, compactionID string, vbUUIDs []uint64, terminator *base.SafeTerminator) (string, error) { base.InfofCtx(ctx, base.KeyAll, "Starting third phase of attachment compaction (cleanup phase) with compactionID: %q", compactionID) compactionLoggingID := "Compaction Cleanup: " + compactionID @@ -523,15 +523,16 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) - return nil, err + return "", err } + metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() doneChan, err := dcpClient.Start() if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) // simplify close in CBG-2234 _ = dcpClient.Close() - return dcpClient, err + return metadataKeyPrefix, err } select { @@ -544,18 +545,18 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore err = dcpClient.Close() if err != nil { base.WarnfCtx(ctx, "[%s] Failed to close attachment compaction DCP client! %v", compactionLoggingID, err) - return dcpClient, err + return metadataKeyPrefix, err } err = <-doneChan if err != nil { - return dcpClient, err + return metadataKeyPrefix, err } base.InfofCtx(ctx, base.KeyAll, "[%s] Cleanup phase of attachment compaction was terminated", compactionLoggingID) } - return dcpClient, err + return metadataKeyPrefix, err } // getCompactionIDSubDocPath is just a tiny helper func that just concatenates the subdoc path we're using to store diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index 8a724cabf0..3582635d80 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -203,9 +203,9 @@ func TestAttachmentCleanup(t *testing.T) { } terminator := base.NewSafeTerminator() - _, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator) + checkpointPrefix, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), nil, terminator) require.NoError(t, err) - //require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentCleanup_cleanup", base.ProductAPIVersion), checkpointPrefix) + require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentCleanup_cleanup", base.ProductAPIVersion), checkpointPrefix) for _, docID := range singleMarkedAttIDs { _, _, err := dataStore.GetXattrs(ctx, docID, []string{base.AttachmentCompactionXattrName}) @@ -391,9 +391,9 @@ func TestAttachmentMarkAndSweepAndCleanup(t *testing.T) { } } - dcpClient, err = attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator) + checkpointPrefix, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, testDb, t.Name(), vbUUIDS, terminator) require.NoError(t, err) - require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentMarkAndSweepAndCleanup_cleanup", base.ProductAPIVersion), dcpClient.GetMetadataKeyPrefix()) + require.Equal(t, fmt.Sprintf("_sync:dcp_ck::sg-%v:att_compaction:TestAttachmentMarkAndSweepAndCleanup_cleanup", base.ProductAPIVersion), checkpointPrefix) for _, attDocKey := range attKeys { var back any @@ -499,7 +499,19 @@ func TestAttachmentCompactionRunTwice(t *testing.T) { err = testDB1.AttachmentCompactionManager.Start(ctx1, map[string]any{"database": testDB1}) assert.NoError(t, err) - RequireBackgroundManagerState(t, testDB1.AttachmentCompactionManager, BackgroundProcessStateStopped) + err = WaitForConditionWithOptions(t, func() bool { + var status AttachmentManagerResponse + rawStatus, err := testDB1.AttachmentCompactionManager.GetStatus(ctx1) + assert.NoError(t, err) + err = base.JSONUnmarshal(rawStatus, &status) + require.NoError(t, err) + + if status.State == BackgroundProcessStateStopped { + return true + } + + return false + }, 200, 1000) // Kick off another run with an attempted start from the other node, checks for error on other node triggerCallback = true diff --git a/db/background_mgr_attachment_compaction.go b/db/background_mgr_attachment_compaction.go index 62f70ff2bc..becbde55bd 100644 --- a/db/background_mgr_attachment_compaction.go +++ b/db/background_mgr_attachment_compaction.go @@ -11,12 +11,10 @@ package db import ( "context" "errors" - "fmt" "sync" "time" "github.com/couchbase/gocbcore/v10" - sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" "github.com/google/uuid" ) @@ -104,31 +102,6 @@ func (a *AttachmentCompactionManager) Init(ctx context.Context, options map[stri return newRunInit() } -func (a *AttachmentCompactionManager) purgeCheckpoints(ctx context.Context, compactionID string, db *Database, dataStore sgbucket.DataStore) error { - fakeCallback := func(event sgbucket.FeedEvent) bool { return false } - var errs []error - for _, phase := range []attachmentCompactionPhase{MarkPhase, CleanupPhase} { - clientOptions := getCompactionDCPClientOptions( - db, - compactionID, - base.NewCollectionNameSet(dataStore), - phase, - fakeCallback, - ) - - dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) - if err != nil { - errs = append(errs, fmt.Errorf("Could not create a dcp client phase %q in order to purge checkpoints: %w", phase, err)) - continue - } - err = dcpClient.PurgeCheckpoints() - if err != nil { - errs = append(errs, fmt.Errorf("error purging checkpoints for phase %q: %w", phase, err)) - } - } - return errors.Join(errs...) -} - func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { database := options["database"].(*Database) @@ -170,7 +143,7 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return false, err, nil } - shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, dcpClient.PurgeCheckpoints) + shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, MarkPhase, dcpClient.GetMetadataKeyPrefix()) } return shouldRetry, err, nil } @@ -196,9 +169,9 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin a.SetPhase("cleanup") worker := func() (shouldRetry bool, err error, value any) { persistClusterStatus() - dcpClient, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator) - if err != nil && dcpClient != nil { - shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, dcpClient.PurgeCheckpoints) + metadataKeyPrefix, err := attachmentCompactCleanupPhase(ctx, dataStore, collectionID, database, a.CompactID, a.VBUUIDs, terminator) + if err != nil { + shouldRetry, err = a.handleAttachmentCompactionRollbackError(ctx, options, dataStore, database, err, CleanupPhase, metadataKeyPrefix) } return shouldRetry, err, nil } @@ -217,13 +190,13 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return nil } -func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase attachmentCompactionPhase, checkpointPurgeFunc base.DCPCheckpointPurgeFunc) (bool, error) { +func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase attachmentCompactionPhase, keyPrefix string) (bool, error) { var rollbackErr gocbcore.DCPRollbackError if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) { base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase) // to rollback any phase for attachment compaction we need to purge all persisted dcp metadata base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID) - err = checkpointPurgeFunc() + err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID) if err != nil { base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err) return false, err diff --git a/db/database.go b/db/database.go index a4137d4645..d12bba1813 100644 --- a/db/database.go +++ b/db/database.go @@ -2510,6 +2510,29 @@ func (db *DatabaseContext) GetCollectionIDs() []uint32 { return maps.Keys(db.CollectionByID) } +// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 +func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string) error { + + bucket, err := base.AsGocbV2Bucket(database.Bucket) + if err != nil { + checkpoint := checkpointPrefix + ":" + feedPrefix + err := database.MetadataStore.Delete(checkpoint) + if err != nil && !base.IsDocNotFoundError(err) { + return err + } + return nil + } + numVbuckets, err := bucket.GetMaxVbno() + if err != nil { + return err + } + + datastore := database.MetadataStore + metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, checkpointPrefix) + metadata.Purge(ctx, base.DefaultNumWorkers) + return nil +} + func (db *DatabaseContext) EnableAllowConflicts(tb testing.TB) { db.Options.AllowConflicts = base.Ptr(true) } From 639216af4ad3f2c6c2aaabb62b8daeb17c72cc7a Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 5 May 2026 11:03:21 -0400 Subject: [PATCH 3/9] Remove background mgr attachment migration --- db/background_mgr_attachment_migration.go | 92 +++++++++-------------- 1 file changed, 37 insertions(+), 55 deletions(-) diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index cd72f61500..9780f893be 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -54,17 +54,6 @@ func NewAttachmentMigrationManager(database *DatabaseContext) *BackgroundManager func (a *AttachmentMigrationManager) Init(ctx context.Context, options map[string]any, clusterStatus []byte) error { newRunInit := func() error { - if a.MigrationID != "" { - dcpClient, err := a.newDCPClient(ctx, a.databaseCtx) - if err != nil { - return fmt.Errorf("Could not create a DCP client when preparing to reset checkpoints: %w", err) - } - base.InfofCtx(ctx, base.KeyAll, "Attachment Migration: Resetting checkpoints for new migration run with migration ID: %s", a.MigrationID) - err = dcpClient.PurgeCheckpoints() - if err != nil { - return fmt.Errorf("Could not purge checkpoints when preparing for new migration run: %w", err) - } - } uniqueUUID, err := uuid.NewRandom() if err != nil { return err @@ -103,9 +92,19 @@ func (a *AttachmentMigrationManager) Init(ctx context.Context, options map[strin return newRunInit() } -func (a *AttachmentMigrationManager) getDCPCallback(ctx context.Context, db *DatabaseContext) sgbucket.FeedEventCallbackFunc { - migrationLoggingID := a.migrationLoggingID() - return func(event sgbucket.FeedEvent) bool { +func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { + db := a.databaseCtx + migrationLoggingID := "Migration: " + a.MigrationID + + persistClusterStatus := func() { + err := persistClusterStatusCallback(ctx) + if err != nil { + base.WarnfCtx(ctx, "[%s] Failed to persist latest cluster status for attachment migration: %v", migrationLoggingID, err) + } + } + defer persistClusterStatus() + + callback := func(event sgbucket.FeedEvent) bool { docID := string(event.Key) collection := db.CollectionByID[event.CollectionID] base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", migrationLoggingID, event.Opcode, base.UD(docID)) @@ -158,56 +157,26 @@ func (a *AttachmentMigrationManager) getDCPCallback(ctx context.Context, db *Dat a.docsChanged.Add(1) return true } -} -// newDCPClient creates a DCP client for the attachment migration process. -func (a *AttachmentMigrationManager) newDCPClient(ctx context.Context, db *DatabaseContext) (base.DCPClient, error) { - scopes, _, err := getCollectionsForAttachmentMigration(db) + scopes, currCollectionIDs, err := getCollectionsForAttachmentMigration(db) if err != nil { - return nil, err + return err } - dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, a.getDCPCallback(ctx, db)) - return base.NewDCPClient(ctx, db.Bucket, dcpOptions) -} - -func (a *AttachmentMigrationManager) migrationLoggingID() string { - return "Migration: " + a.MigrationID -} + dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback) -func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error { - db := a.databaseCtx - migrationLoggingID := a.migrationLoggingID() - - persistClusterStatus := func() { - err := persistClusterStatusCallback(ctx) - if err != nil { - base.WarnfCtx(ctx, "[%s] Failed to persist latest cluster status for attachment migration: %v", migrationLoggingID, err) - } - } - defer persistClusterStatus() + // check for mismatch in collection id's between current collections on the db and prev run - _, currCollectionIDs, err := getCollectionsForAttachmentMigration(db) + err = a.resetDCPMetadataIfNeeded(ctx, db, dcpOptions.CheckpointPrefix, currCollectionIDs) if err != nil { return err } - // check for mismatch in collection id's between current collections on the db and prev run - shouldPurgeCheckpoints := a.shouldResetCheckpoints(ctx, db, getMigrationDCPClientOptions(db, a.MigrationID, nil, nil).CheckpointPrefix, currCollectionIDs) a.SetCollectionIDs(currCollectionIDs) - - dcpClient, err := a.newDCPClient(ctx, db) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, dcpOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment migration DCP client: %v", migrationLoggingID, err) return err } - if shouldPurgeCheckpoints { - base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := dcpClient.PurgeCheckpoints() - if err != nil { - return err - } - } - base.DebugfCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for attachment migration", migrationLoggingID) doneChan, err := dcpClient.Start() @@ -352,18 +321,31 @@ type AttachmentMigrationManagerStatusDoc struct { AttachmentMigrationMeta `json:"meta"` } -// shouldResetCheckpoints returns true if the collection data does not match the previous data. -func (a *AttachmentMigrationManager) shouldResetCheckpoints(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) bool { +// resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run +func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) error { // if we are on our first run, no collections will be defined on the manager yet if len(a.CollectionIDs) == 0 { - return false + return nil } if len(a.CollectionIDs) != len(collectionIDs) { - return true + base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) + err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) + if err != nil { + return err + } + return nil } slices.Sort(collectionIDs) slices.Sort(a.CollectionIDs) - return slices.Compare(collectionIDs, a.CollectionIDs) != 0 + purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs) + if purgeNeeded != 0 { + base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) + err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) + if err != nil { + return err + } + } + return nil } // getCollectionsForAttachmentMigration will get all datastores. From cc19dbf6d6d42324a5bec4a3b1cc102f04b1e9d4 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 5 May 2026 11:38:58 -0400 Subject: [PATCH 4/9] avoid data race --- db/background_mgr_resync_dcp.go | 1 + 1 file changed, 1 insertion(+) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 41425e4d43..bf633d4e29 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -147,6 +147,7 @@ func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) { func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, regenerateSequences bool) sgbucket.FeedEventCallbackFunc { return func(event sgbucket.FeedEvent) bool { + ctx := ctx // copy ctx so it doesn't get modified by multiple copies of this function running simultaneously docID := string(event.Key) base.TracefCtx(ctx, base.KeyAll, "Resync: Received DCP event %d for doc %v", event.Opcode, base.UD(docID)) From 078664a8a9cf038be0d1f75dadaa9f9b7fabd51a Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 5 May 2026 11:42:29 -0400 Subject: [PATCH 5/9] add comments --- base/dcp_client.go | 3 -- base/dcp_sharded.go | 3 +- db/background_mgr_resync_dcp.go | 51 +++------------------------------ 3 files changed, 6 insertions(+), 51 deletions(-) diff --git a/base/dcp_client.go b/base/dcp_client.go index 4302a648e1..b18ded7188 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -18,9 +18,6 @@ import ( "github.com/couchbaselabs/rosmar" ) -// DCPCheckpointPurgeFunc allows purging of DCP Checkpoints -type DCPCheckpointPurgeFunc func() error - // DCPClient is an interface for all DCP implementations. type DCPClient interface { // Start will start the DCP feed. It returns a channel marking the end of the feed. diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 908dcc1d92..6135b28228 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -922,7 +922,8 @@ func (meh *sgMgrEventHandlers) OnFeedError(_ string, r cbgt.Feed, feedErr error) } } -func DeleteShardedDCPCheckpoints(ctx context.Context, datastore DataStore, vbCount uint16, checkpointPrefix string) error { +// PurgeShardedDCPCheckpoints removes the checkpoints created by a cbgt based DCP feed. +func PurgeShardedDCPCheckpoints(ctx context.Context, datastore DataStore, vbCount uint16, checkpointPrefix string) error { var errs []error for vbNo := range vbCount { checkpointID := fmt.Sprintf("%s_%d", checkpointPrefix, vbNo) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index bf633d4e29..9ce3bd3e63 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -119,6 +119,7 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } +// purgeCheckpoints removes checkpoints for a given resyncID. func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error { if resyncID == "" { return errors.New("resyncID is required to delete checkpoints") @@ -135,7 +136,7 @@ func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, r if err != nil { return fmt.Errorf("error getting vb count for checkpoint deletion: %v", err) } - return base.DeleteShardedDCPCheckpoints(ctx, db.MetadataStore, vbCount, checkpointPrefix) + return base.PurgeShardedDCPCheckpoints(ctx, db.MetadataStore, vbCount, checkpointPrefix) } // SetVBUUIDs updates vbuuids in the manager. @@ -145,6 +146,7 @@ func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) { r.VBUUIDs = vbuuids } +// getDCPCallback returns function to process DCP events for resync. func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, regenerateSequences bool) sgbucket.FeedEventCallbackFunc { return func(event sgbucket.FeedEvent) bool { ctx := ctx // copy ctx so it doesn't get modified by multiple copies of this function running simultaneously @@ -248,51 +250,6 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers defer atomic.CompareAndSwapUint32(&db.State, DBResyncing, DBOffline) - callback := func(event sgbucket.FeedEvent) bool { - docID := string(event.Key) - base.TracefCtx(ctx, base.KeyAll, "Resync: Received DCP event %d for doc %v", event.Opcode, base.UD(docID)) - - // Ignore documents without xattrs if possible, to avoid processing unnecessary documents - if r.useXattrs && event.DataType&base.MemcachedDataTypeXattr == 0 { - return true - } - // Don't want to process raw binary docs - // The binary check should suffice but for additional safety also check for empty bodies. This will also avoid - // processing tombstones. - if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 { - return true - } - - // We only want to process full docs. Not any sync docs. - if strings.HasPrefix(docID, base.SyncDocPrefix) { - return true - } - - r.DocsProcessed.Add(1) - db.DbStats.Database().ResyncNumProcessed.Add(1) - databaseCollection := db.CollectionByID[event.CollectionID] - databaseCollection.collectionStats.ResyncNumProcessed.Add(1) - ctx := databaseCollection.AddCollectionContext(ctx) - doc, err := bucketDocumentFromFeed(event) - if err != nil { - base.WarnfCtx(ctx, "Resync: Error getting document from DCP event for doc %q: %v", base.UD(docID), err) - return false - } - err = (&DatabaseCollectionWithUser{ - DatabaseCollection: databaseCollection, - }).ResyncDocument(ctx, docID, doc, regenerateSequences) - - if err == nil { - r.DocsChanged.Add(1) - db.DbStats.Database().ResyncNumChanged.Add(1) - databaseCollection.collectionStats.ResyncNumChanged.Add(1) - } else if err != base.ErrUpdateCancel { - base.WarnfCtx(ctx, "Resync: Error updating doc %q: %v", base.UD(docID), err) - return false - } - return true - } - if r.hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "running resync against all collections") } else { @@ -321,7 +278,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers checkPointPrefix := GetResyncDCPCheckpointPrefix(db.DatabaseContext, r.ResyncID, true) resyncDestFunc := func(janitorRollback func()) (cbgt.Dest, error) { - resyncDest, err := base.NewDCPDest(ctx, callback, db.MetadataStore, db.numVBuckets, true, nil, nil, checkPointPrefix) + resyncDest, err := base.NewDCPDest(ctx, r.getDCPCallback(ctx, db, regenerateSequences), db.MetadataStore, db.numVBuckets, true, nil, nil, checkPointPrefix) if err != nil { return nil, fmt.Errorf("Error creating resync dest: %v", err) } From ba7536f628ddff7683b722faed2d0eb5cfd7311f Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 6 May 2026 11:57:35 -0400 Subject: [PATCH 6/9] Use sharded DCP --- base/collection_gocb.go | 5 + base/dcp_common.go | 60 ++++++++++ base/dcp_sharded.go | 16 --- db/background_mgr_attachment_compaction.go | 12 +- db/background_mgr_attachment_migration.go | 8 +- db/background_mgr_resync_dcp.go | 131 ++++++++------------- db/database.go | 40 +++---- 7 files changed, 150 insertions(+), 122 deletions(-) diff --git a/base/collection_gocb.go b/base/collection_gocb.go index 57c930b845..70d8c5437b 100644 --- a/base/collection_gocb.go +++ b/base/collection_gocb.go @@ -608,3 +608,8 @@ func (c *Collection) setCollectionID() error { } return nil } + +// GetMaxVbno returns the number of vBuckets on this datastore. +func (c *Collection) GetMaxVbno() (uint16, error) { + return c.Bucket.GetMaxVbno() +} diff --git a/base/dcp_common.go b/base/dcp_common.go index cd4706f0ab..c59b502aee 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -13,6 +13,7 @@ package base import ( "bytes" "context" + "errors" "expvar" "fmt" "sync" @@ -22,6 +23,14 @@ import ( "github.com/google/uuid" ) +type DCPFeedMode string + +const ( + DCPFeedGocb DCPFeedMode = "gocb" + DCPFeedRosmar DCPFeedMode = "rosmar" + DCPFeedSharded DCPFeedMode = "cbgt" +) + // Number of non-checkpoint updates per vbucket required to trigger metadata persistence. Must be greater than zero to avoid // retriggering persistence solely based on checkpoint doc echo. // Based on ad-hoc testing w/ travel-sample bucket, increasing this value doesn't significantly improve performance, since under load @@ -282,3 +291,54 @@ func GenerateDcpStreamName(feedID string) (string, error) { } return feedName, nil } + +// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 +func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPrefix string, feedPrefix string, feedMode DCPFeedMode) error { + + switch feedMode { + case DCPFeedRosmar: + checkpoint := checkpointPrefix + ":" + feedPrefix + err := datastore.Delete(checkpoint) + if err != nil && !IsDocNotFoundError(err) { + return err + } + return nil + case DCPFeedGocb: + collection, err := AsCollection(datastore) + if err != nil { + return RedactErrorf("dataStore %q is not a gocb collection: type %t", MD(datastore.GetName()), datastore) + } + numVbuckets, err := collection.GetMaxVbno() + if err != nil { + return err + } + + metadata := NewDCPMetadataCS(ctx, datastore, numVbuckets, DefaultNumWorkers, checkpointPrefix) + metadata.Purge(ctx, DefaultNumWorkers) + return nil + case DCPFeedSharded: + collection, err := AsCollection(datastore) + if err != nil { + return RedactErrorf("dataStore %q is not a gocb collection: type %t", MD(datastore.GetName()), datastore) + } + numVbuckets, err := collection.GetMaxVbno() + if err != nil { + return err + } + var errs []error + for vbNo := range numVbuckets { + checkpointID := fmt.Sprintf("%s_%d", checkpointPrefix, vbNo) + err := datastore.Delete(checkpointID) + if err != nil && !IsDocNotFoundError(err) { + errs = append(errs, fmt.Errorf("error deleting checkpoint %s: %w", checkpointID, err)) + } + } + if errs != nil { + return errors.Join(errs...) + } + return nil + default: + return fmt.Errorf("Unrecognized dcp feed mode: %s", feedMode) + } + return nil +} diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 6135b28228..0917f844d5 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -921,19 +921,3 @@ func (meh *sgMgrEventHandlers) OnFeedError(_ string, r cbgt.Feed, feedErr error) dcpFeed.NotifyMgrOnClose() } } - -// PurgeShardedDCPCheckpoints removes the checkpoints created by a cbgt based DCP feed. -func PurgeShardedDCPCheckpoints(ctx context.Context, datastore DataStore, vbCount uint16, checkpointPrefix string) error { - var errs []error - for vbNo := range vbCount { - checkpointID := fmt.Sprintf("%s_%d", checkpointPrefix, vbNo) - err := datastore.Delete(checkpointID) - if err != nil && !IsDocNotFoundError(err) { - errs = append(errs, fmt.Errorf("error deleting checkpoint %s: %w", checkpointID, err)) - } - } - if errs != nil { - return errors.Join(errs...) - } - return nil -} diff --git a/db/background_mgr_attachment_compaction.go b/db/background_mgr_attachment_compaction.go index becbde55bd..17d71966fb 100644 --- a/db/background_mgr_attachment_compaction.go +++ b/db/background_mgr_attachment_compaction.go @@ -190,13 +190,23 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return nil } +func (*AttachmentCompactionManager) purgeCheckpoints(ctx context.Context, database *Database, checkpointPrefix string, feedID string) error { + return base.PurgeDCPCheckpoints( + ctx, + database.MetadataStore, + checkpointPrefix, + feedID, + database.dcpFeedMode(), + ) +} + func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ctx context.Context, options map[string]any, dataStore base.DataStore, database *Database, err error, phase attachmentCompactionPhase, keyPrefix string) (bool, error) { var rollbackErr gocbcore.DCPRollbackError if errors.As(err, &rollbackErr) || errors.Is(err, base.ErrVbUUIDMismatch) { base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase) // to rollback any phase for attachment compaction we need to purge all persisted dcp metadata base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID) - err = PurgeDCPCheckpoints(ctx, database.DatabaseContext, keyPrefix, a.CompactID) + err = a.purgeCheckpoints(ctx, database, keyPrefix, a.CompactID) if err != nil { base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err) return false, err diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 9780f893be..200f6b7885 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -321,6 +321,10 @@ type AttachmentMigrationManagerStatusDoc struct { AttachmentMigrationMeta `json:"meta"` } +func (a *AttachmentMigrationManager) purgeCheckpoints(ctx context.Context, db *Database, checkpointPrefix string, feedPrefix string) error { + return base.PurgeDCPCheckpoints(ctx, db.MetadataStore, checkpointPrefix, feedPrefix, db.dcpFeedMode()) +} + // resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Context, database *DatabaseContext, metadataKeyPrefix string, collectionIDs []uint32) error { // if we are on our first run, no collections will be defined on the manager yet @@ -329,7 +333,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex } if len(a.CollectionIDs) != len(collectionIDs) { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) + err := base.PurgeDCPCheckpoints(ctx, database.MetadataStore, metadataKeyPrefix, a.MigrationID, database.dcpFeedMode()) if err != nil { return err } @@ -340,7 +344,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs) if purgeNeeded != 0 { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := PurgeDCPCheckpoints(ctx, database, metadataKeyPrefix, a.MigrationID) + err := base.PurgeDCPCheckpoints(ctx, database.MetadataStore, metadataKeyPrefix, a.MigrationID, database.dcpFeedMode()) if err != nil { return err } diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 9ce3bd3e63..3f6e0411bd 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -119,24 +119,14 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } -// purgeCheckpoints removes checkpoints for a given resyncID. func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error { - if resyncID == "" { - return errors.New("resyncID is required to delete checkpoints") - } - checkpointPrefix := GetResyncDCPCheckpointPrefix(db.DatabaseContext, resyncID, r.Distributed) - if !r.Distributed { - dcpClient, err := r.newDCPClient(ctx, db, false) - if err != nil { - return fmt.Errorf("error creating DCP client to delete checkpoints for resync ID %q: %v", resyncID, err) - } - return dcpClient.PurgeCheckpoints() - } - vbCount, err := db.Bucket.GetMaxVbno() - if err != nil { - return fmt.Errorf("error getting vb count for checkpoint deletion: %v", err) - } - return base.PurgeShardedDCPCheckpoints(ctx, db.MetadataStore, vbCount, checkpointPrefix) + return base.PurgeDCPCheckpoints( + ctx, + db.MetadataStore, + GetResyncDCPCheckpointPrefix(db.DatabaseContext, resyncID, r.Distributed), + resyncID, + db.distributedDCPFeedMode(), + ) } // SetVBUUIDs updates vbuuids in the manager. @@ -146,10 +136,45 @@ func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) { r.VBUUIDs = vbuuids } -// getDCPCallback returns function to process DCP events for resync. -func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, regenerateSequences bool) sgbucket.FeedEventCallbackFunc { - return func(event sgbucket.FeedEvent) bool { - ctx := ctx // copy ctx so it doesn't get modified by multiple copies of this function running simultaneously +// Run starts a DCP feed to process documents for resync. +func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) (err error) { + db, ok := options["database"].(*Database) + if !ok { + return errors.New("database option is required and must be of type *Database") + } + regenerateSequences, ok := options["regenerateSequences"].(bool) + if !ok { + return errors.New("regenerateSequences option is required and must be of type bool") + } + resyncCollections, ok := options["collections"].(base.CollectionNames) + if !ok { + return errors.New("collections option is required and must be of type CollectionNames") + } + ctx = context.WithoutCancel(ctx) // drop cancellation from parent context + ctx = base.CorrelationIDLogCtx(ctx, r.ResyncID) + ctx, cancelResync := context.WithCancelCause(ctx) + defer func() { + if err != nil { + cancelResync(err) + } else { + cancelResync(errors.New("resync ended normally")) + } + }() + + var doneChan chan error + var dcpClient base.DCPClient + + persistClusterStatus := func() { + err := persistClusterStatusCallback(ctx) + if err != nil { + base.WarnfCtx(ctx, "Failed to persist cluster status on-demand for resync operation: %v", err) + } + } + defer persistClusterStatus() + + defer atomic.CompareAndSwapUint32(&db.State, DBResyncing, DBOffline) + + callback := func(event sgbucket.FeedEvent) bool { docID := string(event.Key) base.TracefCtx(ctx, base.KeyAll, "Resync: Received DCP event %d for doc %v", event.Opcode, base.UD(docID)) @@ -173,7 +198,7 @@ func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, reg db.DbStats.Database().ResyncNumProcessed.Add(1) databaseCollection := db.CollectionByID[event.CollectionID] databaseCollection.collectionStats.ResyncNumProcessed.Add(1) - ctx = databaseCollection.AddCollectionContext(ctx) + ctx := databaseCollection.AddCollectionContext(ctx) doc, err := bucketDocumentFromFeed(event) if err != nil { base.WarnfCtx(ctx, "Resync: Error getting document from DCP event for doc %q: %v", base.UD(docID), err) @@ -193,62 +218,6 @@ func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, reg } return true } -} - -// newDCPClient creates a DCP client for resync with the appropriate options and callback. -func (r *ResyncManagerDCP) newDCPClient(ctx context.Context, db *Database, regenerateSequences bool) (base.DCPClient, error) { - clientOptions := getResyncDCPClientOptions( - db.DatabaseContext, - r.ResyncID, - r.ResyncedCollections.ToCollectionNameSet(), - r.getDCPCallback(ctx, db, regenerateSequences), - r.Distributed, - ) - dcpClient, err := base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions) - if err != nil { - base.WarnfCtx(ctx, "Failed to create resync DCP client! %v", err) - return nil, err - } - return dcpClient, nil -} - -// Run starts a DCP feed to process documents for resync. -func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) (err error) { - db, ok := options["database"].(*Database) - if !ok { - return errors.New("database option is required and must be of type *Database") - } - regenerateSequences, ok := options["regenerateSequences"].(bool) - if !ok { - return errors.New("regenerateSequences option is required and must be of type bool") - } - resyncCollections, ok := options["collections"].(base.CollectionNames) - if !ok { - return errors.New("collections option is required and must be of type CollectionNames") - } - ctx = context.WithoutCancel(ctx) // drop cancellation from parent context - ctx = base.CorrelationIDLogCtx(ctx, r.ResyncID) - ctx, cancelResync := context.WithCancelCause(ctx) - defer func() { - if err != nil { - cancelResync(err) - } else { - cancelResync(errors.New("resync ended normally")) - } - }() - - var doneChan chan error - var dcpClient base.DCPClient - - persistClusterStatus := func() { - err := persistClusterStatusCallback(ctx) - if err != nil { - base.WarnfCtx(ctx, "Failed to persist cluster status on-demand for resync operation: %v", err) - } - } - defer persistClusterStatus() - - defer atomic.CompareAndSwapUint32(&db.State, DBResyncing, DBOffline) if r.hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "running resync against all collections") @@ -278,7 +247,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers checkPointPrefix := GetResyncDCPCheckpointPrefix(db.DatabaseContext, r.ResyncID, true) resyncDestFunc := func(janitorRollback func()) (cbgt.Dest, error) { - resyncDest, err := base.NewDCPDest(ctx, r.getDCPCallback(ctx, db, regenerateSequences), db.MetadataStore, db.numVBuckets, true, nil, nil, checkPointPrefix) + resyncDest, err := base.NewDCPDest(ctx, callback, db.MetadataStore, db.numVBuckets, true, nil, nil, checkPointPrefix) if err != nil { return nil, fmt.Errorf("Error creating resync dest: %v", err) } @@ -330,7 +299,9 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers defer resyncCbgtContext.Stop(ctx) } else { - dcpClient, err = r.newDCPClient(ctx, db, regenerateSequences) + clientOptions := getResyncDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false) + var err error + dcpClient, err = base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "Failed to create resync DCP client! %v", err) return err diff --git a/db/database.go b/db/database.go index d12bba1813..c4ce190dd2 100644 --- a/db/database.go +++ b/db/database.go @@ -2510,29 +2510,6 @@ func (db *DatabaseContext) GetCollectionIDs() []uint32 { return maps.Keys(db.CollectionByID) } -// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 -func PurgeDCPCheckpoints(ctx context.Context, database *DatabaseContext, checkpointPrefix string, feedPrefix string) error { - - bucket, err := base.AsGocbV2Bucket(database.Bucket) - if err != nil { - checkpoint := checkpointPrefix + ":" + feedPrefix - err := database.MetadataStore.Delete(checkpoint) - if err != nil && !base.IsDocNotFoundError(err) { - return err - } - return nil - } - numVbuckets, err := bucket.GetMaxVbno() - if err != nil { - return err - } - - datastore := database.MetadataStore - metadata := base.NewDCPMetadataCS(ctx, datastore, numVbuckets, base.DefaultNumWorkers, checkpointPrefix) - metadata.Purge(ctx, base.DefaultNumWorkers) - return nil -} - func (db *DatabaseContext) EnableAllowConflicts(tb testing.TB) { db.Options.AllowConflicts = base.Ptr(true) } @@ -2591,3 +2568,20 @@ func (db *DatabaseContext) usingRosmar() bool { func (db *DatabaseContext) WaitForSequenceNotSkipped(ctx context.Context, targetSequence uint64) error { return db.changeCache.waitForSequenceNotSkipped(ctx, targetSequence, defaultWaitForSequence) } + +func (db *DatabaseContext) dcpFeedMode() base.DCPFeedMode { + if db.usingRosmar() { + return base.DCPFeedRosmar + } + return base.DCPFeedGocb +} + +func (db *DatabaseContext) distributedDCPFeedMode() base.DCPFeedMode { + if db.usingRosmar() { + return base.DCPFeedRosmar + } + if db.useShardedDCP() { + return base.DCPFeedSharded + } + return base.DCPFeedGocb +} From f1f2e2a5faedb0479fdc3abc1c93a61ca20fb52d Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 6 May 2026 12:41:01 -0400 Subject: [PATCH 7/9] go lint --- base/dcp_common.go | 1 - 1 file changed, 1 deletion(-) diff --git a/base/dcp_common.go b/base/dcp_common.go index c59b502aee..dc6db8764b 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -340,5 +340,4 @@ func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPre default: return fmt.Errorf("Unrecognized dcp feed mode: %s", feedMode) } - return nil } From 1571b95ddfab65192e2fb8886555e8d16cf296fa Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 6 May 2026 15:11:10 -0400 Subject: [PATCH 8/9] make comments --- base/dcp_common.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/base/dcp_common.go b/base/dcp_common.go index dc6db8764b..58f764cfc2 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -23,11 +23,15 @@ import ( "github.com/google/uuid" ) +// DCPFeedMode describes the types of DCP feed that can be run. type DCPFeedMode string const ( - DCPFeedGocb DCPFeedMode = "gocb" - DCPFeedRosmar DCPFeedMode = "rosmar" + // DCPFeedGocb represents a single node DCP feed for a Couchbase Server bucket. + DCPFeedGocb DCPFeedMode = "gocb" + // DCPFeedRosmar represents a DCP feed for a rosmar bucket. + DCPFeedRosmar DCPFeedMode = "rosmar" + // DCPFeedSharded represents a cbgt-based DCP feed for a Couchbase Server bucket. DCPFeedSharded DCPFeedMode = "cbgt" ) @@ -292,7 +296,8 @@ func GenerateDcpStreamName(feedID string) (string, error) { return feedName, nil } -// PurgeDCPCheckpoints will purge all DCP metadata from previous run in the bucket, used to reset dcp client to 0 +// PurgeDCPCheckpoints will purge all DCP metadata from previous run a bucket. If the checkpoints are not present, this +// is not an error. func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPrefix string, feedPrefix string, feedMode DCPFeedMode) error { switch feedMode { From bc5208fa8c99faefd130dac413f10d1e2e059f53 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 6 May 2026 15:27:43 -0400 Subject: [PATCH 9/9] cleanup code --- base/dcp_common.go | 6 +-- db/attachment_compaction.go | 6 ++- db/background_mgr_attachment_compaction.go | 3 +- db/background_mgr_attachment_migration.go | 46 ++++++++++++------- ...ackground_mgr_attachment_migration_test.go | 5 +- db/background_mgr_resync_dcp.go | 12 +++-- db/background_mgr_resync_dcp_test.go | 2 +- db/database.go | 2 + 8 files changed, 55 insertions(+), 27 deletions(-) diff --git a/base/dcp_common.go b/base/dcp_common.go index 58f764cfc2..fc3e9b429a 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -311,7 +311,7 @@ func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPre case DCPFeedGocb: collection, err := AsCollection(datastore) if err != nil { - return RedactErrorf("dataStore %q is not a gocb collection: type %t", MD(datastore.GetName()), datastore) + return RedactErrorf("dataStore %q is not a gocb collection: type %T", MD(datastore.GetName()), datastore) } numVbuckets, err := collection.GetMaxVbno() if err != nil { @@ -324,7 +324,7 @@ func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPre case DCPFeedSharded: collection, err := AsCollection(datastore) if err != nil { - return RedactErrorf("dataStore %q is not a gocb collection: type %t", MD(datastore.GetName()), datastore) + return RedactErrorf("dataStore %q is not a gocb collection: type %T", MD(datastore.GetName()), datastore) } numVbuckets, err := collection.GetMaxVbno() if err != nil { @@ -332,7 +332,7 @@ func PurgeDCPCheckpoints(ctx context.Context, datastore DataStore, checkpointPre } var errs []error for vbNo := range numVbuckets { - checkpointID := fmt.Sprintf("%s_%d", checkpointPrefix, vbNo) + checkpointID := fmt.Sprintf("%s%d", checkpointPrefix, vbNo) err := datastore.Delete(checkpointID) if err != nil && !IsDocNotFoundError(err) { errs = append(errs, fmt.Errorf("error deleting checkpoint %s: %w", checkpointID, err)) diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 2c7ff1cc38..e6d54dbdf1 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -565,6 +565,10 @@ func getCompactionIDSubDocPath(compactionID string) string { return base.AttachmentCompactionXattrName + "." + CompactionIDKey + "." + compactionID } +func getAttachmentCompactionFeedID(compactionID string, phase attachmentCompactionPhase) string { + return fmt.Sprintf("att_compaction:%v_%v", compactionID, phase) +} + // getCompactionDCPClientOptions returns the default set of DCPClientOptions suitable for attachment compaction func getCompactionDCPClientOptions(db *Database, compactionID string, collectionNames base.CollectionNameSet, phase attachmentCompactionPhase, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { return base.DCPClientOptions{ @@ -573,7 +577,7 @@ func getCompactionDCPClientOptions(db *Database, compactionID string, collection MetadataStoreType: base.DCPMetadataStoreCS, CollectionNames: collectionNames, Callback: callback, - FeedID: fmt.Sprintf("att_compaction:%v_%v", compactionID, phase), + FeedID: getAttachmentCompactionFeedID(compactionID, phase), CheckpointPrefix: GetAttachmentCompactionDCPCheckpointPrefix(db.DatabaseContext, compactionID, phase), } } diff --git a/db/background_mgr_attachment_compaction.go b/db/background_mgr_attachment_compaction.go index 17d71966fb..2cb02c4146 100644 --- a/db/background_mgr_attachment_compaction.go +++ b/db/background_mgr_attachment_compaction.go @@ -190,6 +190,7 @@ func (a *AttachmentCompactionManager) Run(ctx context.Context, options map[strin return nil } +// purgeCheckpoints removes the checkpoints for a specific checkpointPrefix and feed name. func (*AttachmentCompactionManager) purgeCheckpoints(ctx context.Context, database *Database, checkpointPrefix string, feedID string) error { return base.PurgeDCPCheckpoints( ctx, @@ -206,7 +207,7 @@ func (a *AttachmentCompactionManager) handleAttachmentCompactionRollbackError(ct base.InfofCtx(ctx, base.KeyDCP, "rollback indicated on %s phase of attachment compaction, resetting the task", phase) // to rollback any phase for attachment compaction we need to purge all persisted dcp metadata base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.CompactID) - err = a.purgeCheckpoints(ctx, database, keyPrefix, a.CompactID) + err = a.purgeCheckpoints(ctx, database, keyPrefix, getAttachmentCompactionFeedID(a.CompactID, phase)) if err != nil { base.WarnfCtx(ctx, "error occurred during purging of dcp metadata: %s", err) return false, err diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index 200f6b7885..4805558a88 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -162,7 +162,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string if err != nil { return err } - dcpOptions := getMigrationDCPClientOptions(db, a.MigrationID, scopes, callback) + dcpOptions := a.getDCPClientOptions(a.MigrationID, scopes, callback) // check for mismatch in collection id's between current collections on the db and prev run @@ -286,21 +286,31 @@ func (a *AttachmentMigrationManager) GetProcessStatus(status BackgroundManagerSt return statusJSON, metaJSON, err } +// getCheckpointPrefix returns the checkpoint prefix for attachment migration checkpoints. +func (a *AttachmentMigrationManager) getCheckpointPrefix(migrationID string) string { + return fmt.Sprintf("%s:sg-%v:att_migration:%v", + a.databaseCtx.MetadataKeys.DCPCheckpointPrefix(a.databaseCtx.Options.GroupID), + base.ProductAPIVersion, + migrationID, + ) +} + +// getDCPFeedID returns the ID specifically for logging +func (a *AttachmentMigrationManager) DCPFeedID(migrationID string) string { + return fmt.Sprintf("att_migration:%v", migrationID) +} + // getMigrationDCPClientOptions returns options for DCP client for attachment migration. CollectionIDs represent the Couchbase Server // CollectionIDs and prefix represents the checkpoint prefix for checkpoint documents. -func getMigrationDCPClientOptions(db *DatabaseContext, migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { +func (a *AttachmentMigrationManager) getDCPClientOptions(migrationID string, scopes base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc) base.DCPClientOptions { return base.DCPClientOptions{ - FeedID: fmt.Sprintf("att_migration:%v", migrationID), + FeedID: a.DCPFeedID(migrationID), OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, CollectionNames: scopes, - CheckpointPrefix: fmt.Sprintf("%s:sg-%v:att_migration:%v", - db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID), - base.ProductAPIVersion, - migrationID, - ), - Callback: callback, + CheckpointPrefix: a.getCheckpointPrefix(migrationID), + Callback: callback, } } @@ -321,8 +331,15 @@ type AttachmentMigrationManagerStatusDoc struct { AttachmentMigrationMeta `json:"meta"` } -func (a *AttachmentMigrationManager) purgeCheckpoints(ctx context.Context, db *Database, checkpointPrefix string, feedPrefix string) error { - return base.PurgeDCPCheckpoints(ctx, db.MetadataStore, checkpointPrefix, feedPrefix, db.dcpFeedMode()) +// purgeCheckpoints will remove the checkpoints for a specific migration ID. +func (a *AttachmentMigrationManager) purgeCheckpoints(ctx context.Context, db *DatabaseContext, migrationID string) error { + return base.PurgeDCPCheckpoints( + ctx, + db.MetadataStore, + a.getCheckpointPrefix(migrationID), + migrationID, + db.dcpFeedMode(), + ) } // resetDCPMetadataIfNeeded will check for mismatch between current collectionIDs and collectionIDs on previous run @@ -333,7 +350,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex } if len(a.CollectionIDs) != len(collectionIDs) { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := base.PurgeDCPCheckpoints(ctx, database.MetadataStore, metadataKeyPrefix, a.MigrationID, database.dcpFeedMode()) + err := a.purgeCheckpoints(ctx, database, a.MigrationID) if err != nil { return err } @@ -344,10 +361,7 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex purgeNeeded := slices.Compare(collectionIDs, a.CollectionIDs) if purgeNeeded != 0 { base.InfofCtx(ctx, base.KeyDCP, "Purging invalid checkpoints for background task run %s", a.MigrationID) - err := base.PurgeDCPCheckpoints(ctx, database.MetadataStore, metadataKeyPrefix, a.MigrationID, database.dcpFeedMode()) - if err != nil { - return err - } + return a.purgeCheckpoints(ctx, database, a.MigrationID) } return nil } diff --git a/db/background_mgr_attachment_migration_test.go b/db/background_mgr_attachment_migration_test.go index 09db21a5d8..1db42ebd4f 100644 --- a/db/background_mgr_attachment_migration_test.go +++ b/db/background_mgr_attachment_migration_test.go @@ -296,8 +296,9 @@ func TestAttachmentMigrationCheckpointPrefix(t *testing.T) { ) require.NoError(t, err) defer db.Close(ctx) - clientOptions := getMigrationDCPClientOptions( - db, + + mgr := NewAttachmentMigrationManager(db) + clientOptions := mgr.Process.(*AttachmentMigrationManager).getDCPClientOptions( migrationID, db.collectionNameSet(), func(sgbucket.FeedEvent) bool { diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 3f6e0411bd..15ac22afb2 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -119,6 +119,7 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu return nil } +// purgeCheckpoints removes checkpoints for a given resync run. func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error { return base.PurgeDCPCheckpoints( ctx, @@ -299,7 +300,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers defer resyncCbgtContext.Stop(ctx) } else { - clientOptions := getResyncDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false) + clientOptions := r.getDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false) var err error dcpClient, err = base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions) if err != nil { @@ -521,12 +522,17 @@ func initializePrincipalDocsIndex(ctx context.Context, db *Database) error { return InitializeIndexes(ctx, n1qlStore, options) } +// DCPFeedID returns the feed ID used by the resync DCP feed. +func (r *ResyncManagerDCP) DCPFeedID(resyncID string) string { + return fmt.Sprintf("resync:%v", resyncID) +} + // getResyncDCPClientOptions returns the default set of DCPClientOptions suitable for resync. collectionIDs // represent Couchbase Server collection IDs. prefix represents the prefixed name of the checkpoint documents // used to store DCP checkpoints. -func getResyncDCPClientOptions(db *DatabaseContext, resyncID string, collectionNames base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc, distributed bool) base.DCPClientOptions { +func (r *ResyncManagerDCP) getDCPClientOptions(db *DatabaseContext, resyncID string, collectionNames base.CollectionNameSet, callback sgbucket.FeedEventCallbackFunc, distributed bool) base.DCPClientOptions { return base.DCPClientOptions{ - FeedID: fmt.Sprintf("resync:%v", resyncID), + FeedID: r.DCPFeedID(resyncID), OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index c4a5c90e1b..4cff49643d 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -738,7 +738,7 @@ func TestResyncCheckpointPrefix(t *testing.T) { ) require.NoError(t, err) defer db.Close(ctx) - clientOptions := getResyncDCPClientOptions( + clientOptions := db.ResyncManager.Process.(*ResyncManagerDCP).getDCPClientOptions( db, resyncID, test.collectionNames, diff --git a/db/database.go b/db/database.go index c4ce190dd2..6749fa2ffa 100644 --- a/db/database.go +++ b/db/database.go @@ -2569,6 +2569,7 @@ func (db *DatabaseContext) WaitForSequenceNotSkipped(ctx context.Context, target return db.changeCache.waitForSequenceNotSkipped(ctx, targetSequence, defaultWaitForSequence) } +// dcpFeedMode describes the dcpFeedMode for the given backing store if the DCP feed is always single node. func (db *DatabaseContext) dcpFeedMode() base.DCPFeedMode { if db.usingRosmar() { return base.DCPFeedRosmar @@ -2576,6 +2577,7 @@ func (db *DatabaseContext) dcpFeedMode() base.DCPFeedMode { return base.DCPFeedGocb } +// dcpFeedMode describes the dcpFeedMode for the given backing store if sharded DCP feeds are supported. func (db *DatabaseContext) distributedDCPFeedMode() base.DCPFeedMode { if db.usingRosmar() { return base.DCPFeedRosmar