Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions base/constants_syncdocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const MetadataIdPrefix = "m_" // Prefix for met
const SyncDocMetadataPrefix = SyncDocPrefix + MetadataIdPrefix // Prefix for all namespaced Sync Gateway metadata documents
const DCPCheckpointPrefix = "dcp_ck:"

const minimumAttachmentMigrationMetadataVersion = "4.0.0" // minimum metadata version that needs to be defined for metadata migration.

// Sync Gateway Metadata document types
type metadataKey int

Expand Down Expand Up @@ -378,8 +380,8 @@ func CollectionSyncFunctionKeyWithGroupID(groupID string, scopeName, collectionN

// SyncInfo documents are stored in collections to identify the metadataID associated with sync metadata in that collection
type SyncInfo struct {
MetadataID string `json:"metadataID,omitempty"`
MetaDataVersion string `json:"metadata_version,omitempty"`
MetadataID *string `json:"metadataID,omitempty"`
MetaDataVersion string `json:"metadata_version,omitempty"`
}

// initSyncInfo attempts to initialize syncInfo for a datastore
Expand All @@ -395,7 +397,7 @@ func InitSyncInfo(ctx context.Context, ds DataStore, metadataID string) (require
if metadataID == "" {
return false, true, nil
}
newSyncInfo := &SyncInfo{MetadataID: metadataID}
newSyncInfo := &SyncInfo{MetadataID: Ptr(metadataID)}
_, addErr := ds.Add(SGSyncInfo, 0, newSyncInfo)
if IsCasMismatch(addErr) {
// attempt new fetch
Expand All @@ -406,22 +408,33 @@ func InitSyncInfo(ctx context.Context, ds DataStore, metadataID string) (require
} else if addErr != nil {
return true, true, fmt.Errorf("Error adding syncInfo: %v", addErr)
}
// successfully added
requiresResync = syncInfo.requiresResync(metadataID)
requiresAttachmentMigration, err = CompareMetadataVersion(ctx, syncInfo.MetaDataVersion)
if err != nil {
return syncInfo.MetadataID != metadataID, true, err
return requiresResync, true, err
}
return false, requiresAttachmentMigration, nil
return requiresResync, requiresAttachmentMigration, nil
} else if fetchErr != nil {
return true, true, fmt.Errorf("Error retrieving syncInfo: %v", fetchErr)
}
requiresResync = syncInfo.requiresResync(metadataID)
// check for meta version, if we don't have meta version of 4.0 we need to run migration job
requiresAttachmentMigration, err = CompareMetadataVersion(ctx, syncInfo.MetaDataVersion)
if err != nil {
return syncInfo.MetadataID != metadataID, true, err
return requiresResync, true, err
}

return syncInfo.MetadataID != metadataID, requiresAttachmentMigration, nil
return requiresResync, requiresAttachmentMigration, nil
}

// requiresResync determines if a given SyncInfo document represents a collection requiring resync.
func (s *SyncInfo) requiresResync(metadataID string) bool {
// if metadataID is not set, then the document was empty (pre collections) or it only ran attachment migration. Either
// way, this means the associated collection only has a single database associated and doesn't need resync.
if s.MetadataID == nil {
return false
}
return *s.MetadataID != metadataID
}

// SetSyncInfoMetadataID sets syncInfo in a DataStore to the specified metadataID, preserving metadata version if present
Expand All @@ -440,7 +453,7 @@ func SetSyncInfoMetadataID(ds DataStore, metadataID string) error {
}
}
// if we have a metadataID to set, set it preserving the metadata version if present
syncInfo.MetadataID = metadataID
syncInfo.MetadataID = Ptr(metadataID)
bytes, err := JSONMarshal(&syncInfo)
return bytes, nil, false, err
})
Expand Down Expand Up @@ -496,7 +509,7 @@ func CheckRequireAttachmentMigration(ctx context.Context, version *ComparableBui
AssertfCtx(ctx, "failed to build comparable build version for syncInfo metaVersion")
return true, fmt.Errorf("corrupt syncInfo metaVersion value")
}
minVerStr := "4.0.0" // minimum meta version that needs to be defined for metadata migration. Any version less than this will require attachment migration
minVerStr := minimumAttachmentMigrationMetadataVersion
minVersion, err := NewComparableBuildVersionFromString(minVerStr)
if err != nil {
AssertfCtx(ctx, "failed to build comparable build version for minimum version for attachment migration")
Expand Down
179 changes: 179 additions & 0 deletions base/constants_syncdocs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ package base
import (
"fmt"
"strings"
"sync/atomic"
"testing"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -117,3 +119,180 @@ func TestDCPMetadataKeyUpgrade(t *testing.T) {
versionPrefix = customMetadataKeys.DCPVersionedCheckpointPrefix("", 0)
require.Equal(t, nonVersionedPrefix, versionPrefix)
}

func TestInitSyncInfoErrors(t *testing.T) {
ctx := TestCtx(t)
bucket := GetTestBucket(t)
defer bucket.Close(ctx)

ds, ok := AsLeakyDataStore(NewLeakyBucket(bucket, LeakyBucketConfig{}).DefaultDataStore())
require.True(t, ok, "expected leaky bucket to return a leaky data store")

shouldFailAdd := atomic.Bool{}
expectedMetadataID := "metadataID"

missingErrorMsg := "missing"
if !UnitTestUrlIsWalrus() {
missingErrorMsg = "not found"
}
testCases := []struct {
name string
expectedError string
requiresResync bool
requiresAttachmentMigration bool
addCallback func(docID string) (bool, error)
}{
{

name: "generic error",
requiresResync: true,
requiresAttachmentMigration: true,
addCallback: func(docID string) (bool, error) {
return false, fmt.Errorf("generic error")
},
expectedError: "generic error",
},
{
name: "single cas error, then empty",
requiresResync: true,
requiresAttachmentMigration: true,
addCallback: func(docID string) (bool, error) {
return false, sgbucket.CasMismatchErr{}
},
expectedError: missingErrorMsg,
},
{
name: "single cas error, get replacement with metadataID=match, no metadataVersion",
requiresResync: false,
requiresAttachmentMigration: true,
addCallback: func(docID string) (bool, error) {
if shouldFailAdd.CompareAndSwap(false, true) {
newSyncInfo := &SyncInfo{MetadataID: Ptr(expectedMetadataID)}
added, err := ds.Add(docID, 0, newSyncInfo)
require.True(t, added)
require.NoError(t, err)
return false, sgbucket.CasMismatchErr{}
Comment on lines +168 to +174
}
return false, nil
},
expectedError: "",
},
{
name: "single cas error, get replacement with metadataID=mismatch, no metadataVersion",
requiresResync: true,
requiresAttachmentMigration: true,
addCallback: func(docID string) (bool, error) {
if shouldFailAdd.CompareAndSwap(false, true) {
newSyncInfo := &SyncInfo{
MetadataID: Ptr("another metadataID"),
}
added, err := ds.Add(docID, 0, newSyncInfo)
require.True(t, added)
require.NoError(t, err)
return false, sgbucket.CasMismatchErr{}
}
return false, nil
},
expectedError: "",
},
{
name: "single cas error, get replacement with metadataID=match, correct metadataVersion",
requiresResync: false,
requiresAttachmentMigration: false,
addCallback: func(docID string) (bool, error) {
if shouldFailAdd.CompareAndSwap(false, true) {
newSyncInfo := &SyncInfo{
MetadataID: Ptr(expectedMetadataID),
MetaDataVersion: minimumAttachmentMigrationMetadataVersion,
}
added, err := ds.Add(docID, 0, newSyncInfo)
require.True(t, added)
require.NoError(t, err)
return false, sgbucket.CasMismatchErr{}
}
return false, nil
},
expectedError: "",
},
{
name: "single cas error, get replacement with metadataID=mismatch, correct metadataVersion",
requiresResync: true,
requiresAttachmentMigration: false,
addCallback: func(docID string) (bool, error) {
if shouldFailAdd.CompareAndSwap(false, true) {
newSyncInfo := &SyncInfo{
MetadataID: Ptr("another metadataID"),
MetaDataVersion: minimumAttachmentMigrationMetadataVersion,
}
added, err := ds.Add(docID, 0, newSyncInfo)
require.True(t, added)
require.NoError(t, err)
return false, sgbucket.CasMismatchErr{}
}
return false, nil
},
expectedError: "",
},
{
name: "single cas error, get replacement with metadataID=match, incorrect metadataVersion",
requiresResync: false,
requiresAttachmentMigration: true,
addCallback: func(docID string) (bool, error) {
if shouldFailAdd.CompareAndSwap(false, true) {
newSyncInfo := &SyncInfo{
MetadataID: Ptr(expectedMetadataID),
MetaDataVersion: "3.0.0",
}
added, err := ds.Add(docID, 0, newSyncInfo)
require.True(t, added)
require.NoError(t, err)
return false, sgbucket.CasMismatchErr{}
}
return false, nil
},
expectedError: "",
},
{
name: "single cas error, get replacement with metadataID=mismatch, incorrect metadataVersion",
requiresResync: true,
requiresAttachmentMigration: true,
addCallback: func(docID string) (bool, error) {
if shouldFailAdd.CompareAndSwap(false, true) {
newSyncInfo := &SyncInfo{
MetadataID: Ptr("another metadataID"),
MetaDataVersion: "3.0.0",
}
added, err := ds.Add(docID, 0, newSyncInfo)
require.True(t, added)
require.NoError(t, err)
return false, sgbucket.CasMismatchErr{}
}
return false, nil
},
expectedError: "",
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer func() {
err := ds.Delete(SGSyncInfo)
if err != nil {
RequireDocNotFoundError(t, err)
return
}
assert.NoError(t, err)
}()
shouldFailAdd.Store(false)
ds.config.AddCallback = test.addCallback
requiresResync, requiresAttachmentMigration, err := InitSyncInfo(ctx, ds, expectedMetadataID)
if test.expectedError != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.expectedError)
} else {
require.NoError(t, err)
}
require.Equal(t, test.requiresResync, requiresResync, "Expected requiresResync to be %t", test.requiresResync)
require.Equal(t, test.requiresAttachmentMigration, requiresAttachmentMigration, "Expected requiresAttachmentMigration to be %t", test.requiresAttachmentMigration)
})
}
}
3 changes: 3 additions & 0 deletions base/leaky_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ type LeakyBucketConfig struct {
// TouchCallback issues a callback during touch.
TouchCallback func(key string) error

// AddCallback issues a callback during Add.
AddCallback func(key string) (bool, error)

// When IgnoreClose is set to true, bucket.Close() is a no-op. Used when multiple references to a bucket are active.
IgnoreClose bool
}
Expand Down
6 changes: 6 additions & 0 deletions base/leaky_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (lds *LeakyDataStore) Touch(k string, exp uint32) (cas uint64, err error) {
return lds.dataStore.Touch(k, exp)
}
func (lds *LeakyDataStore) Add(k string, exp uint32, v interface{}) (added bool, err error) {
if lds.config.AddCallback != nil {
added, err := lds.config.AddCallback(k)
if err != nil {
return added, err
}
}
return lds.dataStore.Add(k, exp, v)
}
func (lds *LeakyDataStore) AddRaw(k string, exp uint32, v []byte) (added bool, err error) {
Expand Down
39 changes: 26 additions & 13 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4678,8 +4678,10 @@ func TestSettingSyncInfo(t *testing.T) {
var syncInfo base.SyncInfo
_, err := ds.Get(base.SGSyncInfo, &syncInfo)
require.NoError(t, err)
assert.Equal(t, "1", syncInfo.MetaDataVersion)
assert.Equal(t, "someID", syncInfo.MetadataID)
require.Equal(t, base.SyncInfo{
MetaDataVersion: "1",
MetadataID: base.Ptr("someID"),
}, syncInfo)

// remove sync info to test another permutation
require.NoError(t, ds.Delete(base.SGSyncInfo))
Expand All @@ -4691,45 +4693,51 @@ func TestSettingSyncInfo(t *testing.T) {
syncInfo = base.SyncInfo{}
_, err = ds.Get(base.SGSyncInfo, &syncInfo)
require.NoError(t, err)
assert.Equal(t, "1", syncInfo.MetaDataVersion)
assert.Equal(t, "someID", syncInfo.MetadataID)
require.Equal(t, base.SyncInfo{
MetaDataVersion: "1",
MetadataID: base.Ptr("someID"),
}, syncInfo)

// test updating each element in sync info now both elements are defined
require.NoError(t, base.SetSyncInfoMetaVersion(ds, "4"))
_, err = ds.Get(base.SGSyncInfo, &syncInfo)
require.NoError(t, err)
assert.Equal(t, "4", syncInfo.MetaDataVersion)
assert.Equal(t, "someID", syncInfo.MetadataID)
require.Equal(t, base.SyncInfo{
MetaDataVersion: "4",
MetadataID: base.Ptr("someID"),
}, syncInfo)

require.NoError(t, base.SetSyncInfoMetadataID(ds, "test"))
_, err = ds.Get(base.SGSyncInfo, &syncInfo)
require.NoError(t, err)
assert.Equal(t, "4", syncInfo.MetaDataVersion)
assert.Equal(t, "test", syncInfo.MetadataID)
require.Equal(t, base.SyncInfo{
MetaDataVersion: "4",
MetadataID: base.Ptr("test"),
}, syncInfo)
}

// TestRequireMigration:
// - Purpose is to test code pathways inside the InitSyncInfo function will return requires attachment migration
// as expected.
func TestRequireMigration(t *testing.T) {
func TestInitSyncInfo(t *testing.T) {
type testCase struct {
name string
initialMetaID string
newMetadataID string
metaVersion string
requireResync bool
requireMigration bool
}
testCases := []testCase{
{
name: "sync info in bucket with metadataID set",
initialMetaID: "someID",
requireMigration: true,
requireResync: true,
},
{
name: "sync info in bucket with metadataID set, set newMetadataID",
initialMetaID: "someID",
newMetadataID: "testID",
requireMigration: true,
requireResync: true,
},
{
name: "correct metaversion already defined, no metadata ID to set",
Expand Down Expand Up @@ -4767,13 +4775,18 @@ func TestRequireMigration(t *testing.T) {
require.NoError(t, base.SetSyncInfoMetaVersion(ds, testcase.metaVersion))
}

_, requireMigration, err := base.InitSyncInfo(ctx, ds, testcase.newMetadataID)
requireResync, requireMigration, err := base.InitSyncInfo(ctx, ds, testcase.newMetadataID)
require.NoError(t, err)
if testcase.requireMigration {
assert.True(t, requireMigration)
} else {
assert.False(t, requireMigration)
}
if testcase.requireResync {
assert.True(t, requireResync)
} else {
assert.False(t, requireResync)
}

// cleanup bucket
require.NoError(t, ds.Delete(base.SGSyncInfo))
Expand Down
Loading
Loading