Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/dcp_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func init() {

type SGDest interface {
cbgt.Dest
cbgt.DestEx
}

// DCPDest implements SGDest (superset of cbgt.Dest) interface to manage updates coming from a
Expand Down
23 changes: 20 additions & 3 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"sync"
"time"

"errors"

"github.com/couchbase/cbgt"
sgbucket "github.com/couchbase/sg-bucket"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -233,7 +234,7 @@ func createCBGTIndex(ctx context.Context, c *CbgtContext, opts ShardedDCPOptions

vbNo, err := opts.Bucket.GetMaxVbno()
if err != nil {
return errors.Wrapf(err, "Unable to retrieve maxVbNo for bucket %s", MD(opts.Bucket.GetName()).Redact())
return RedactErrorf("Unable to retrieve maxVbNo for bucket %s: %w", MD(opts.Bucket.GetName()), err)
}

numPartitions := opts.NumPartitions
Expand Down Expand Up @@ -308,7 +309,7 @@ func getCBGTIndexUUID(manager *cbgt.Manager, indexName string) (previousUUID str

_, indexDefsMap, err := manager.GetIndexDefs(true)
if err != nil {
return "", errors.Wrapf(err, "Error calling CBGT GetIndexDefs() on index: %s", indexName)
return "", fmt.Errorf("Error calling CBGT GetIndexDefs() on index: %s: %w", indexName, err)
}

indexDef, ok := indexDefsMap[indexName]
Expand Down Expand Up @@ -920,3 +921,19 @@ func (meh *sgMgrEventHandlers) OnFeedError(_ string, r cbgt.Feed, feedErr error)
dcpFeed.NotifyMgrOnClose()
}
}

// PurgeShardedDCPCheckpoints removes the checkpoints created by a cbgt based DCP feed.
func PurgeShardedDCPCheckpoints(ctx context.Context, datastore DataStore, vbCount uint16, checkpointPrefix string) error {
var errs []error
for vbNo := range vbCount {
checkpointID := fmt.Sprintf("%s_%d", checkpointPrefix, vbNo)
err := datastore.Delete(checkpointID)
if err != nil && !IsDocNotFoundError(err) {
errs = append(errs, fmt.Errorf("error deleting checkpoint %s: %w", checkpointID, err))
}
}
if errs != nil {
return errors.Join(errs...)
}
return nil
}
134 changes: 90 additions & 44 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu
return nil
}

if statusDoc.ResyncID != "" {
err := r.purgeCheckpoints(ctx, db, statusDoc.ResyncID)
if err != nil {
base.WarnfCtx(ctx, "Failed to delete checkpoints for previous resync ID %q: %v, these will be abandoned and unused", statusDoc.ResyncID, err)
}
}

newID, err := uuid.NewRandom()
if err != nil {
return err
Expand All @@ -112,52 +119,37 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu
return nil
}

// purgeCheckpoints removes checkpoints for a given resyncID.
func (r *ResyncManagerDCP) purgeCheckpoints(ctx context.Context, db *Database, resyncID string) error {
if resyncID == "" {
return errors.New("resyncID is required to delete checkpoints")
}
checkpointPrefix := GetResyncDCPCheckpointPrefix(db.DatabaseContext, resyncID, r.Distributed)
if !r.Distributed {
dcpClient, err := r.newDCPClient(ctx, db, false)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newDCPClient seems like it does a decent amount of setup work (including loading and initializing metadata from the checkpoints we're about to delete). Is this just to handle the differences in purge between Rosmar and Gocb implementations?

Any way of having an equivalent function to base.PurgeShardedDCPCheckpoints for non-sharded feeds that accepts a MetadataStore and numWorkers?

if err != nil {
return fmt.Errorf("error creating DCP client to delete checkpoints for resync ID %q: %v", resyncID, err)
}
return dcpClient.PurgeCheckpoints()
}
vbCount, err := db.Bucket.GetMaxVbno()
if err != nil {
return fmt.Errorf("error getting vb count for checkpoint deletion: %v", err)
}
return base.PurgeShardedDCPCheckpoints(ctx, db.MetadataStore, vbCount, checkpointPrefix)
}

// SetVBUUIDs updates vbuuids in the manager.
func (r *ResyncManagerDCP) SetVBUUIDs(vbuuids []uint64) {
r.lock.Lock()
defer r.lock.Unlock()
r.VBUUIDs = vbuuids
}

// Run starts a DCP feed to process documents for resync.
func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) (err error) {
db, ok := options["database"].(*Database)
if !ok {
return errors.New("database option is required and must be of type *Database")
}
regenerateSequences, ok := options["regenerateSequences"].(bool)
if !ok {
return errors.New("regenerateSequences option is required and must be of type bool")
}
resyncCollections, ok := options["collections"].(base.CollectionNames)
if !ok {
return errors.New("collections option is required and must be of type CollectionNames")
}
ctx = context.WithoutCancel(ctx) // drop cancellation from parent context
ctx = base.CorrelationIDLogCtx(ctx, r.ResyncID)
ctx, cancelResync := context.WithCancelCause(ctx)
defer func() {
if err != nil {
cancelResync(err)
} else {
cancelResync(errors.New("resync ended normally"))
}
}()

var doneChan chan error
var dcpClient base.DCPClient

persistClusterStatus := func() {
err := persistClusterStatusCallback(ctx)
if err != nil {
base.WarnfCtx(ctx, "Failed to persist cluster status on-demand for resync operation: %v", err)
}
}
defer persistClusterStatus()

defer atomic.CompareAndSwapUint32(&db.State, DBResyncing, DBOffline)

callback := func(event sgbucket.FeedEvent) bool {
// getDCPCallback returns function to process DCP events for resync.
func (r *ResyncManagerDCP) getDCPCallback(ctx context.Context, db *Database, regenerateSequences bool) sgbucket.FeedEventCallbackFunc {
return func(event sgbucket.FeedEvent) bool {
ctx := ctx // copy ctx so it doesn't get modified by multiple copies of this function running simultaneously
docID := string(event.Key)
base.TracefCtx(ctx, base.KeyAll, "Resync: Received DCP event %d for doc %v", event.Opcode, base.UD(docID))

Expand All @@ -181,7 +173,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
db.DbStats.Database().ResyncNumProcessed.Add(1)
databaseCollection := db.CollectionByID[event.CollectionID]
databaseCollection.collectionStats.ResyncNumProcessed.Add(1)
ctx := databaseCollection.AddCollectionContext(ctx)
ctx = databaseCollection.AddCollectionContext(ctx)
doc, err := bucketDocumentFromFeed(event)
if err != nil {
base.WarnfCtx(ctx, "Resync: Error getting document from DCP event for doc %q: %v", base.UD(docID), err)
Expand All @@ -201,6 +193,62 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
}
return true
}
}

// newDCPClient creates a DCP client for resync with the appropriate options and callback.
func (r *ResyncManagerDCP) newDCPClient(ctx context.Context, db *Database, regenerateSequences bool) (base.DCPClient, error) {
clientOptions := getResyncDCPClientOptions(
db.DatabaseContext,
r.ResyncID,
r.ResyncedCollections.ToCollectionNameSet(),
r.getDCPCallback(ctx, db, regenerateSequences),
r.Distributed,
)
dcpClient, err := base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions)
if err != nil {
base.WarnfCtx(ctx, "Failed to create resync DCP client! %v", err)
return nil, err
}
return dcpClient, nil
}

// Run starts a DCP feed to process documents for resync.
func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) (err error) {
db, ok := options["database"].(*Database)
if !ok {
return errors.New("database option is required and must be of type *Database")
}
regenerateSequences, ok := options["regenerateSequences"].(bool)
if !ok {
return errors.New("regenerateSequences option is required and must be of type bool")
}
resyncCollections, ok := options["collections"].(base.CollectionNames)
if !ok {
return errors.New("collections option is required and must be of type CollectionNames")
}
ctx = context.WithoutCancel(ctx) // drop cancellation from parent context
ctx = base.CorrelationIDLogCtx(ctx, r.ResyncID)
ctx, cancelResync := context.WithCancelCause(ctx)
defer func() {
if err != nil {
cancelResync(err)
} else {
cancelResync(errors.New("resync ended normally"))
}
}()

var doneChan chan error
var dcpClient base.DCPClient

persistClusterStatus := func() {
err := persistClusterStatusCallback(ctx)
if err != nil {
base.WarnfCtx(ctx, "Failed to persist cluster status on-demand for resync operation: %v", err)
}
}
defer persistClusterStatus()

defer atomic.CompareAndSwapUint32(&db.State, DBResyncing, DBOffline)

if r.hasAllCollections {
base.InfofCtx(ctx, base.KeyAll, "running resync against all collections")
Expand Down Expand Up @@ -230,7 +278,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
checkPointPrefix := GetResyncDCPCheckpointPrefix(db.DatabaseContext, r.ResyncID, true)

resyncDestFunc := func(janitorRollback func()) (cbgt.Dest, error) {
resyncDest, err := base.NewDCPDest(ctx, callback, db.MetadataStore, db.numVBuckets, true, nil, nil, checkPointPrefix)
resyncDest, err := base.NewDCPDest(ctx, r.getDCPCallback(ctx, db, regenerateSequences), db.MetadataStore, db.numVBuckets, true, nil, nil, checkPointPrefix)
if err != nil {
return nil, fmt.Errorf("Error creating resync dest: %v", err)
}
Expand Down Expand Up @@ -282,9 +330,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
defer resyncCbgtContext.Stop(ctx)
} else {

clientOptions := getResyncDCPClientOptions(db.DatabaseContext, r.ResyncID, r.ResyncedCollections.ToCollectionNameSet(), callback, false)
var err error
dcpClient, err = base.NewDCPClient(ctx, db.DatabaseContext.Bucket, clientOptions)
dcpClient, err = r.newDCPClient(ctx, db, regenerateSequences)
if err != nil {
base.WarnfCtx(ctx, "Failed to create resync DCP client! %v", err)
return err
Expand Down
Loading