diff --git a/base/constants_syncdocs.go b/base/constants_syncdocs.go index 7e3a257906..32050a4018 100644 --- a/base/constants_syncdocs.go +++ b/base/constants_syncdocs.go @@ -22,6 +22,8 @@ const MetadataIdPrefix = "m_" // Prefix for met const SyncDocMetadataPrefix = SyncDocPrefix + MetadataIdPrefix // Prefix for all namespaced Sync Gateway metadata documents const DCPCheckpointPrefix = "dcp_ck:" +const minimumAttachmentMigrationMetadataVersion = "4.0.0" // minimum metadata version that needs to be defined for metadata migration. + // Sync Gateway Metadata document types type metadataKey int @@ -378,8 +380,8 @@ func CollectionSyncFunctionKeyWithGroupID(groupID string, scopeName, collectionN // SyncInfo documents are stored in collections to identify the metadataID associated with sync metadata in that collection type SyncInfo struct { - MetadataID string `json:"metadataID,omitempty"` - MetaDataVersion string `json:"metadata_version,omitempty"` + MetadataID *string `json:"metadataID,omitempty"` + MetaDataVersion string `json:"metadata_version,omitempty"` } // initSyncInfo attempts to initialize syncInfo for a datastore @@ -395,7 +397,7 @@ func InitSyncInfo(ctx context.Context, ds DataStore, metadataID string) (require if metadataID == "" { return false, true, nil } - newSyncInfo := &SyncInfo{MetadataID: metadataID} + newSyncInfo := &SyncInfo{MetadataID: Ptr(metadataID)} _, addErr := ds.Add(SGSyncInfo, 0, newSyncInfo) if IsCasMismatch(addErr) { // attempt new fetch @@ -406,22 +408,33 @@ func InitSyncInfo(ctx context.Context, ds DataStore, metadataID string) (require } else if addErr != nil { return true, true, fmt.Errorf("Error adding syncInfo: %v", addErr) } - // successfully added + requiresResync = syncInfo.requiresResync(metadataID) requiresAttachmentMigration, err = CompareMetadataVersion(ctx, syncInfo.MetaDataVersion) if err != nil { - return syncInfo.MetadataID != metadataID, true, err + return requiresResync, true, err } - return false, requiresAttachmentMigration, nil + return requiresResync, requiresAttachmentMigration, nil } else if fetchErr != nil { return true, true, fmt.Errorf("Error retrieving syncInfo: %v", fetchErr) } + requiresResync = syncInfo.requiresResync(metadataID) // check for meta version, if we don't have meta version of 4.0 we need to run migration job requiresAttachmentMigration, err = CompareMetadataVersion(ctx, syncInfo.MetaDataVersion) if err != nil { - return syncInfo.MetadataID != metadataID, true, err + return requiresResync, true, err } - return syncInfo.MetadataID != metadataID, requiresAttachmentMigration, nil + return requiresResync, requiresAttachmentMigration, nil +} + +// requiresResync determines if a given SyncInfo document represents a collection requiring resync. +func (s *SyncInfo) requiresResync(metadataID string) bool { + // if metadataID is not set, then the document was empty (pre collections) or it only ran attachment migration. Either + // way, this means the associated collection only has a single database associated and doesn't need resync. + if s.MetadataID == nil { + return false + } + return *s.MetadataID != metadataID } // SetSyncInfoMetadataID sets syncInfo in a DataStore to the specified metadataID, preserving metadata version if present @@ -440,7 +453,7 @@ func SetSyncInfoMetadataID(ds DataStore, metadataID string) error { } } // if we have a metadataID to set, set it preserving the metadata version if present - syncInfo.MetadataID = metadataID + syncInfo.MetadataID = Ptr(metadataID) bytes, err := JSONMarshal(&syncInfo) return bytes, nil, false, err }) @@ -496,7 +509,7 @@ func CheckRequireAttachmentMigration(ctx context.Context, version *ComparableBui AssertfCtx(ctx, "failed to build comparable build version for syncInfo metaVersion") return true, fmt.Errorf("corrupt syncInfo metaVersion value") } - minVerStr := "4.0.0" // minimum meta version that needs to be defined for metadata migration. Any version less than this will require attachment migration + minVerStr := minimumAttachmentMigrationMetadataVersion minVersion, err := NewComparableBuildVersionFromString(minVerStr) if err != nil { AssertfCtx(ctx, "failed to build comparable build version for minimum version for attachment migration") diff --git a/base/constants_syncdocs_test.go b/base/constants_syncdocs_test.go index 40f6c3c593..b71533213b 100644 --- a/base/constants_syncdocs_test.go +++ b/base/constants_syncdocs_test.go @@ -11,8 +11,10 @@ package base import ( "fmt" "strings" + "sync/atomic" "testing" + sgbucket "github.com/couchbase/sg-bucket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -117,3 +119,180 @@ func TestDCPMetadataKeyUpgrade(t *testing.T) { versionPrefix = customMetadataKeys.DCPVersionedCheckpointPrefix("", 0) require.Equal(t, nonVersionedPrefix, versionPrefix) } + +func TestInitSyncInfoErrors(t *testing.T) { + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + ds, ok := AsLeakyDataStore(NewLeakyBucket(bucket, LeakyBucketConfig{}).DefaultDataStore()) + require.True(t, ok, "expected leaky bucket to return a leaky data store") + + shouldFailAdd := atomic.Bool{} + expectedMetadataID := "metadataID" + + missingErrorMsg := "missing" + if !UnitTestUrlIsWalrus() { + missingErrorMsg = "not found" + } + testCases := []struct { + name string + expectedError string + requiresResync bool + requiresAttachmentMigration bool + addCallback func(docID string) (bool, error) + }{ + { + + name: "generic error", + requiresResync: true, + requiresAttachmentMigration: true, + addCallback: func(docID string) (bool, error) { + return false, fmt.Errorf("generic error") + }, + expectedError: "generic error", + }, + { + name: "single cas error, then empty", + requiresResync: true, + requiresAttachmentMigration: true, + addCallback: func(docID string) (bool, error) { + return false, sgbucket.CasMismatchErr{} + }, + expectedError: missingErrorMsg, + }, + { + name: "single cas error, get replacement with metadataID=match, no metadataVersion", + requiresResync: false, + requiresAttachmentMigration: true, + addCallback: func(docID string) (bool, error) { + if shouldFailAdd.CompareAndSwap(false, true) { + newSyncInfo := &SyncInfo{MetadataID: Ptr(expectedMetadataID)} + added, err := ds.Add(docID, 0, newSyncInfo) + require.True(t, added) + require.NoError(t, err) + return false, sgbucket.CasMismatchErr{} + } + return false, nil + }, + expectedError: "", + }, + { + name: "single cas error, get replacement with metadataID=mismatch, no metadataVersion", + requiresResync: true, + requiresAttachmentMigration: true, + addCallback: func(docID string) (bool, error) { + if shouldFailAdd.CompareAndSwap(false, true) { + newSyncInfo := &SyncInfo{ + MetadataID: Ptr("another metadataID"), + } + added, err := ds.Add(docID, 0, newSyncInfo) + require.True(t, added) + require.NoError(t, err) + return false, sgbucket.CasMismatchErr{} + } + return false, nil + }, + expectedError: "", + }, + { + name: "single cas error, get replacement with metadataID=match, correct metadataVersion", + requiresResync: false, + requiresAttachmentMigration: false, + addCallback: func(docID string) (bool, error) { + if shouldFailAdd.CompareAndSwap(false, true) { + newSyncInfo := &SyncInfo{ + MetadataID: Ptr(expectedMetadataID), + MetaDataVersion: minimumAttachmentMigrationMetadataVersion, + } + added, err := ds.Add(docID, 0, newSyncInfo) + require.True(t, added) + require.NoError(t, err) + return false, sgbucket.CasMismatchErr{} + } + return false, nil + }, + expectedError: "", + }, + { + name: "single cas error, get replacement with metadataID=mismatch, correct metadataVersion", + requiresResync: true, + requiresAttachmentMigration: false, + addCallback: func(docID string) (bool, error) { + if shouldFailAdd.CompareAndSwap(false, true) { + newSyncInfo := &SyncInfo{ + MetadataID: Ptr("another metadataID"), + MetaDataVersion: minimumAttachmentMigrationMetadataVersion, + } + added, err := ds.Add(docID, 0, newSyncInfo) + require.True(t, added) + require.NoError(t, err) + return false, sgbucket.CasMismatchErr{} + } + return false, nil + }, + expectedError: "", + }, + { + name: "single cas error, get replacement with metadataID=match, incorrect metadataVersion", + requiresResync: false, + requiresAttachmentMigration: true, + addCallback: func(docID string) (bool, error) { + if shouldFailAdd.CompareAndSwap(false, true) { + newSyncInfo := &SyncInfo{ + MetadataID: Ptr(expectedMetadataID), + MetaDataVersion: "3.0.0", + } + added, err := ds.Add(docID, 0, newSyncInfo) + require.True(t, added) + require.NoError(t, err) + return false, sgbucket.CasMismatchErr{} + } + return false, nil + }, + expectedError: "", + }, + { + name: "single cas error, get replacement with metadataID=mismatch, incorrect metadataVersion", + requiresResync: true, + requiresAttachmentMigration: true, + addCallback: func(docID string) (bool, error) { + if shouldFailAdd.CompareAndSwap(false, true) { + newSyncInfo := &SyncInfo{ + MetadataID: Ptr("another metadataID"), + MetaDataVersion: "3.0.0", + } + added, err := ds.Add(docID, 0, newSyncInfo) + require.True(t, added) + require.NoError(t, err) + return false, sgbucket.CasMismatchErr{} + } + return false, nil + }, + expectedError: "", + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + defer func() { + err := ds.Delete(SGSyncInfo) + if err != nil { + RequireDocNotFoundError(t, err) + return + } + assert.NoError(t, err) + }() + shouldFailAdd.Store(false) + ds.config.AddCallback = test.addCallback + requiresResync, requiresAttachmentMigration, err := InitSyncInfo(ctx, ds, expectedMetadataID) + if test.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), test.expectedError) + } else { + require.NoError(t, err) + } + require.Equal(t, test.requiresResync, requiresResync, "Expected requiresResync to be %t", test.requiresResync) + require.Equal(t, test.requiresAttachmentMigration, requiresAttachmentMigration, "Expected requiresAttachmentMigration to be %t", test.requiresAttachmentMigration) + }) + } +} diff --git a/base/leaky_bucket.go b/base/leaky_bucket.go index bef3e43c70..c33c2c5861 100644 --- a/base/leaky_bucket.go +++ b/base/leaky_bucket.go @@ -156,6 +156,9 @@ type LeakyBucketConfig struct { // TouchCallback issues a callback during touch. TouchCallback func(key string) error + // AddCallback issues a callback during Add. + AddCallback func(key string) (bool, error) + // When IgnoreClose is set to true, bucket.Close() is a no-op. Used when multiple references to a bucket are active. IgnoreClose bool } diff --git a/base/leaky_datastore.go b/base/leaky_datastore.go index cd809cb57e..3907e2e244 100644 --- a/base/leaky_datastore.go +++ b/base/leaky_datastore.go @@ -128,6 +128,12 @@ func (lds *LeakyDataStore) Touch(k string, exp uint32) (cas uint64, err error) { return lds.dataStore.Touch(k, exp) } func (lds *LeakyDataStore) Add(k string, exp uint32, v interface{}) (added bool, err error) { + if lds.config.AddCallback != nil { + added, err := lds.config.AddCallback(k) + if err != nil { + return added, err + } + } return lds.dataStore.Add(k, exp, v) } func (lds *LeakyDataStore) AddRaw(k string, exp uint32, v []byte) (added bool, err error) { diff --git a/db/database_test.go b/db/database_test.go index f57c722490..103944037f 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -4678,8 +4678,10 @@ func TestSettingSyncInfo(t *testing.T) { var syncInfo base.SyncInfo _, err := ds.Get(base.SGSyncInfo, &syncInfo) require.NoError(t, err) - assert.Equal(t, "1", syncInfo.MetaDataVersion) - assert.Equal(t, "someID", syncInfo.MetadataID) + require.Equal(t, base.SyncInfo{ + MetaDataVersion: "1", + MetadataID: base.Ptr("someID"), + }, syncInfo) // remove sync info to test another permutation require.NoError(t, ds.Delete(base.SGSyncInfo)) @@ -4691,32 +4693,36 @@ func TestSettingSyncInfo(t *testing.T) { syncInfo = base.SyncInfo{} _, err = ds.Get(base.SGSyncInfo, &syncInfo) require.NoError(t, err) - assert.Equal(t, "1", syncInfo.MetaDataVersion) - assert.Equal(t, "someID", syncInfo.MetadataID) + require.Equal(t, base.SyncInfo{ + MetaDataVersion: "1", + MetadataID: base.Ptr("someID"), + }, syncInfo) // test updating each element in sync info now both elements are defined require.NoError(t, base.SetSyncInfoMetaVersion(ds, "4")) _, err = ds.Get(base.SGSyncInfo, &syncInfo) require.NoError(t, err) - assert.Equal(t, "4", syncInfo.MetaDataVersion) - assert.Equal(t, "someID", syncInfo.MetadataID) + require.Equal(t, base.SyncInfo{ + MetaDataVersion: "4", + MetadataID: base.Ptr("someID"), + }, syncInfo) require.NoError(t, base.SetSyncInfoMetadataID(ds, "test")) _, err = ds.Get(base.SGSyncInfo, &syncInfo) require.NoError(t, err) - assert.Equal(t, "4", syncInfo.MetaDataVersion) - assert.Equal(t, "test", syncInfo.MetadataID) + require.Equal(t, base.SyncInfo{ + MetaDataVersion: "4", + MetadataID: base.Ptr("test"), + }, syncInfo) } -// TestRequireMigration: -// - Purpose is to test code pathways inside the InitSyncInfo function will return requires attachment migration -// as expected. -func TestRequireMigration(t *testing.T) { +func TestInitSyncInfo(t *testing.T) { type testCase struct { name string initialMetaID string newMetadataID string metaVersion string + requireResync bool requireMigration bool } testCases := []testCase{ @@ -4724,12 +4730,14 @@ func TestRequireMigration(t *testing.T) { name: "sync info in bucket with metadataID set", initialMetaID: "someID", requireMigration: true, + requireResync: true, }, { name: "sync info in bucket with metadataID set, set newMetadataID", initialMetaID: "someID", newMetadataID: "testID", requireMigration: true, + requireResync: true, }, { name: "correct metaversion already defined, no metadata ID to set", @@ -4767,13 +4775,18 @@ func TestRequireMigration(t *testing.T) { require.NoError(t, base.SetSyncInfoMetaVersion(ds, testcase.metaVersion)) } - _, requireMigration, err := base.InitSyncInfo(ctx, ds, testcase.newMetadataID) + requireResync, requireMigration, err := base.InitSyncInfo(ctx, ds, testcase.newMetadataID) require.NoError(t, err) if testcase.requireMigration { assert.True(t, requireMigration) } else { assert.False(t, requireMigration) } + if testcase.requireResync { + assert.True(t, requireResync) + } else { + assert.False(t, requireResync) + } // cleanup bucket require.NoError(t, ds.Delete(base.SGSyncInfo)) diff --git a/rest/config_manager.go b/rest/config_manager.go index ac08369fa5..39a2e29fb7 100644 --- a/rest/config_manager.go +++ b/rest/config_manager.go @@ -820,8 +820,8 @@ func (b *bootstrapContext) computeMetadataID(ctx context.Context, registry *Gate return standardMetadataID } - if exists && syncInfo.MetadataID != defaultMetadataID { - base.InfofCtx(ctx, base.KeyConfig, "Using metadata ID %q for db %q because db uses the default collection, and _sync:syncInfo in the default collection specifies the non-default metadata ID %q", base.MD(standardMetadataID), base.MD(config.Name), base.MD(syncInfo.MetadataID)) + if exists && (syncInfo.MetadataID != nil && *syncInfo.MetadataID != defaultMetadataID) { + base.InfofCtx(ctx, base.KeyConfig, "Using metadata ID %q for db %q because db uses the default collection, and _sync:syncInfo in the default collection specifies the non-default metadata ID %q", base.MD(standardMetadataID), base.MD(config.Name), base.MD(*syncInfo.MetadataID)) return standardMetadataID } diff --git a/rest/config_manager_test.go b/rest/config_manager_test.go index f653991d52..6240e59b5d 100644 --- a/rest/config_manager_test.go +++ b/rest/config_manager_test.go @@ -140,7 +140,7 @@ func TestComputeMetadataID(t *testing.T) { assert.Equal(t, standardMetadataID, metadataID) // Write syncInfo to default collection, indicating that default collection is already associated with a different database - docBody := []byte(`{"foo":"bar"}`) + docBody := []byte(`{"metadataID":"foo"}`) err = defaultStore.Set(base.SGSyncInfo, 0, nil, docBody) require.NoError(t, err) defaultDbConfig.Scopes = nil diff --git a/rest/upgradetest/upgrade_registry_test.go b/rest/upgradetest/upgrade_registry_test.go index 2c5f0368a2..2989b66a20 100644 --- a/rest/upgradetest/upgrade_registry_test.go +++ b/rest/upgradetest/upgrade_registry_test.go @@ -192,6 +192,8 @@ func TestLegacyMetadataID(t *testing.T) { resp := legacyRT.SendAdminRequest("PUT", "/db/testLegacyMetadataID", `{"test":"test"}`) rest.RequireStatus(t, resp, http.StatusCreated) + legacyRT.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) + dbConfigString := getDbConfigFromLegacyConfig(legacyRT) legacyRT.Close() @@ -206,7 +208,7 @@ func TestLegacyMetadataID(t *testing.T) { // check if database is online dbRoot := persistentRT.GetDatabaseRoot("db") - require.Equal(t, db.RunStateString[db.DBOnline], dbRoot.State) + require.Equal(t, db.RunStateString[db.DBOnline], dbRoot.State, "Database did not come online after upgrade %#+v", dbRoot) } // TestMetadataIDRenameDatabase verifies that resync is not required when deleting and recreating a database (with a @@ -257,6 +259,7 @@ func TestMetadataIDWithConfigGroups(t *testing.T) { resp := legacyRT.SendAdminRequest("PUT", "/db/testLegacyMetadataID", `{"test":"test"}`) assert.Equal(t, http.StatusCreated, resp.Code) + legacyRT.WaitForAttachmentMigrationStatus(t, db.BackgroundProcessStateCompleted) dbConfigString := getDbConfigFromLegacyConfig(legacyRT) legacyRT.Close() @@ -283,10 +286,10 @@ func TestMetadataIDWithConfigGroups(t *testing.T) { // check if databases are online dbRoot := group1RT.GetDatabaseRoot("db") - require.Equal(t, db.RunStateString[db.DBOnline], dbRoot.State) + require.Equal(t, db.RunStateString[db.DBOnline], dbRoot.State, "Database did not come online after upgrade %#+v", dbRoot) dbRoot = group2RT.GetDatabaseRoot("db") - require.Equal(t, db.RunStateString[db.DBOnline], dbRoot.State) + require.Equal(t, db.RunStateString[db.DBOnline], dbRoot.State, "Database did not come online after upgrade %#+v", dbRoot) } func requireBobUserLocation(rt *rest.RestTester, docName string) {