diff options
19 files changed, 628 insertions, 151 deletions
diff --git a/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js b/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js new file mode 100644 index 00000000000..e9d0208e9e3 --- /dev/null +++ b/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js @@ -0,0 +1,105 @@ +/** + * Test that tenant migration recipient rejects conflicting recipientSyncData commands. + * + * @tags: [requires_fcv_47, requires_majority_read_concern, incompatible_with_eft] + */ +(function() { + +"use strict"; +load("jstests/libs/parallel_shell_helpers.js"); +load("jstests/libs/curop_helpers.js"); // for waitForCurOpByFailPoint(). + +var rst = new ReplSetTest({nodes: 1, nodeOptions: {setParameter: {enableTenantMigrations: true}}}); +rst.startSet(); +rst.initiate(); + +const primary = rst.getPrimary(); +const configDB = primary.getDB("config"); +const tenantMigrationRecipientStateColl = configDB["tenantMigrationRecipients"]; +const tenantMigrationRecipientStateCollNss = tenantMigrationRecipientStateColl.getFullName(); + +const tenantId = "test"; +const connectionString = "foo/bar:12345"; +const readPreference = { + mode: 'primary' +}; + +function checkTenantMigrationRecipientStateCollCount(expectedCount) { + let res = tenantMigrationRecipientStateColl.find().toArray(); + assert.eq(expectedCount, + res.length, + "'config.tenantMigrationRecipients' collection count mismatch: " + tojson(res)); +} + +function startRecipientSyncDataCmd(migrationUuid, tenantId, connectionString, readPreference) { + jsTestLog("Starting a recipientSyncDataCmd for migrationUuid: " + migrationUuid + + " tenantId: '" + tenantId + "'"); + assert.commandWorkedOrFailedWithCode(db.adminCommand({ + recipientSyncData: 1, + migrationId: migrationUuid, + donorConnectionString: connectionString, + tenantId: tenantId, + readPreference: readPreference + }), + ErrorCodes.ConflictingOperationInProgress); +} + +// Enable the failpoint to stop the tenant migration after persisting the state doc. +assert.commandWorked(primary.adminCommand({ + configureFailPoint: "fpAfterPersistingTenantMigrationRecipientInstanceStateDoc", + mode: "alwaysOn", + data: {action: "stop"} +})); + +{ + // Enable the failpoint before inserting the state document by upsert command. + assert.commandWorked(primary.adminCommand( + {configureFailPoint: "hangBeforeUpsertPerformsInsert", mode: "alwaysOn"})); + + // Sanity check : 'config.tenantMigrationRecipients' collection count should be empty. + checkTenantMigrationRecipientStateCollCount(0); + + // Start the conflicting recipientSyncData cmds. + const recipientSyncDataCmd1 = startParallelShell( + funWithArgs(startRecipientSyncDataCmd, UUID(), tenantId, connectionString, readPreference), + primary.port); + const recipientSyncDataCmd2 = startParallelShell( + funWithArgs(startRecipientSyncDataCmd, UUID(), tenantId, connectionString, readPreference), + primary.port); + + // Wait until both the conflicting instances got started. + checkLog.containsWithCount(primary, "Starting tenant migration recipient instance", 2); + + jsTestLog("Waiting for 'hangBeforeUpsertPerformsInsert' failpoint to reach"); + waitForCurOpByFailPoint( + configDB, tenantMigrationRecipientStateCollNss, "hangBeforeUpsertPerformsInsert"); + + // Unblock the tenant migration instance from persisting the state doc. + assert.commandWorked( + primary.adminCommand({configureFailPoint: "hangBeforeUpsertPerformsInsert", mode: "off"})); + + // Wait for both the conflicting instances to complete. + recipientSyncDataCmd1(); + recipientSyncDataCmd2(); + + // Only one instance should have succeeded in persisting the state doc, other should have failed + // with ErrorCodes.ConflictingOperationInProgress. + checkTenantMigrationRecipientStateCollCount(1); +} + +{ + // Now, again call recipientSyncData cmd to run on the same tenant "test'. Since, our previous + // instance for tenant "test' wasn't garbage collected, the migration status for that tenant is + // considered as active. So, this command should fail with + // ErrorCodes.ConflictingOperationInProgress. + const recipientSyncDataCmd3 = startParallelShell( + funWithArgs(startRecipientSyncDataCmd, UUID(), tenantId, connectionString, readPreference), + primary.port); + recipientSyncDataCmd3(); + + // Collection count should remain the same. + checkTenantMigrationRecipientStateCollCount(1); +} + +rst.stopSet(); +})(); diff --git a/jstests/replsets/tenant_migration_invalid_inputs.js b/jstests/replsets/tenant_migration_invalid_inputs.js index 415c8261270..443cfb2fb14 100644 --- a/jstests/replsets/tenant_migration_invalid_inputs.js +++ b/jstests/replsets/tenant_migration_invalid_inputs.js @@ -1,7 +1,7 @@ /** - * Tests that the donorStartMigration command throws a error if the provided tenantId - * is unsupported (i.e. '', 'admin', 'local' or 'config') or if the recipient connection string - * matches the donor's connection string. + * Tests that the donorStartMigration & recipientSyncData command throws an error if the provided + * tenantId is unsupported (i.e. '', 'admin', 'local' or 'config') or if the recipient + * connection string matches the donor's connection string. * * @tags: [requires_fcv_47] */ @@ -14,17 +14,23 @@ const rst = rst.startSet(); rst.initiate(); const primary = rst.getPrimary(); +const tenantId = "test"; +const connectionString = "foo/bar:12345"; +const readPreference = { + mode: 'primary' +}; -// Test unsupported tenantIds. -const unsupportedTenantIds = ['', 'admin', 'local', 'config']; +jsTestLog("Testing 'donorStartMigration' command provided with invalid options."); -unsupportedTenantIds.forEach((tenantId) => { +// Test unsupported database prefixes. +const unsupportedtenantIds = ['', 'admin', 'local', 'config']; +unsupportedtenantIds.forEach((invalidTenantId) => { assert.commandFailedWithCode(primary.adminCommand({ donorStartMigration: 1, migrationId: UUID(), - recipientConnectionString: "testRecipientConnString", - tenantId: tenantId, - readPreference: {mode: "primary"} + recipientConnectionString: connectionString, + tenantId: invalidTenantId, + readPreference: readPreference }), ErrorCodes.BadValue); }); @@ -34,20 +40,70 @@ assert.commandFailedWithCode(primary.adminCommand({ donorStartMigration: 1, migrationId: UUID(), recipientConnectionString: rst.getURL(), - tenantId: "testTenantId", - readPreference: {mode: "primary"} + tenantId: tenantId, + readPreference: readPreference }), - ErrorCodes.InvalidOptions); + ErrorCodes.BadValue); -// Test migrating a database to a recipient that has one or more same hosts as donor -const conflictingRecipientConnectionString = "foo/bar:12345," + primary.host; +// Test migrating a database to a recipient that has one or more same hosts as donor. +const conflictingRecipientConnectionString = connectionString + "," + primary.host; assert.commandFailedWithCode(primary.adminCommand({ donorStartMigration: 1, migrationId: UUID(), recipientConnectionString: conflictingRecipientConnectionString, - tenantId: "testTenantId", - readPreference: {mode: "primary"} + tenantId: tenantId, + readPreference: readPreference +}), + ErrorCodes.BadValue); + +jsTestLog("Testing 'recipientSyncData' command provided with invalid options."); + +// Test unsupported database prefixes. +unsupportedtenantIds.forEach((invalidTenantId) => { + assert.commandFailedWithCode(primary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: connectionString, + tenantId: invalidTenantId, + readPreference: readPreference + }), + ErrorCodes.BadValue); +}); + +// Test migrating a database from recipient itself. +assert.commandFailedWithCode(primary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: rst.getURL(), + tenantId: tenantId, + readPreference: readPreference }), - ErrorCodes.InvalidOptions); + ErrorCodes.BadValue); + +// Test migrating a database from a donor that has one or more same hosts as recipient. +const conflictingDonorConnectionString = connectionString + "," + primary.host; +assert.commandFailedWithCode(primary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: conflictingDonorConnectionString, + tenantId: tenantId, + readPreference: readPreference +}), + ErrorCodes.BadValue); + +// Test 'returnAfterReachingTimestamp' can' be null. +const nullTimestamps = [Timestamp(0, 0), Timestamp(0, 1)]; +nullTimestamps.forEach((nullTs) => { + assert.commandFailedWithCode(primary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: connectionString, + tenantId: tenantId, + readPreference: readPreference, + returnAfterReachingTimestamp: nullTs + }), + ErrorCodes.BadValue); +}); + rst.stopSet(); })();
\ No newline at end of file diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 7b74363fa7e..d451c3cd4f3 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -462,6 +462,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/repl_server_parameters', '$BUILD_DIR/mongo/db/repl/tenant_migration_donor_service', + '$BUILD_DIR/mongo/db/repl/tenant_migration_recipient_service', '$BUILD_DIR/mongo/db/rw_concern_d', '$BUILD_DIR/mongo/db/s/sharding_runtime_d', '$BUILD_DIR/mongo/db/server_options_core', @@ -557,7 +558,9 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/connection_string', '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl', '$BUILD_DIR/mongo/idl/idl_parser', ] diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp index d451be395f3..467e4c4c9c9 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp @@ -55,25 +55,6 @@ public: const RequestType& requestBody = request(); - // Sanity check that donor and recipient do not share any of the same hosts for - // migration - const auto donorConnectionString = - repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); - const auto donorServers = donorConnectionString.getServers(); - const auto recipientServers = - uassertStatusOK( - MongoURI::parse(requestBody.getRecipientConnectionString().toString())) - .getServers(); - for (const auto& server : donorServers) { - uassert(ErrorCodes::InvalidOptions, - "recipient and donor hosts must be different", - std::none_of(recipientServers.begin(), - recipientServers.end(), - [&](const HostAndPort& recipientServer) { - return server == recipientServer; - })); - } - const auto donorStateDoc = TenantMigrationDonorDocument(requestBody.getMigrationId(), requestBody.getRecipientConnectionString().toString(), diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.idl b/src/mongo/db/commands/tenant_migration_donor_cmds.idl index 1c3b75c34d7..b1e3416a379 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.idl @@ -63,6 +63,8 @@ commands: recipientConnectionString: description: "The URI string that the donor will utilize to create a connection with the recipient." type: string + validator: + callback: "validateConnectionString" tenantId: description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed." type: string diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp index 7013f72be93..c20be7a533b 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp @@ -28,8 +28,11 @@ */ #include "mongo/db/commands.h" +#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h" #include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h" +#include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" namespace mongo { namespace { @@ -37,16 +40,40 @@ namespace { class RecipientSyncDataCmd : public TypedCommand<RecipientSyncDataCmd> { public: using Request = RecipientSyncData; + using Response = RecipientSyncDataResponse; class Invocation : public InvocationBase { public: using InvocationBase::InvocationBase; - void typedRun(OperationContext* opCtx) { + Response typedRun(OperationContext* opCtx) { uassert(ErrorCodes::CommandNotSupported, "recipientSyncData command not enabled", repl::enableTenantMigrations); + + const auto& cmd = request(); + + TenantMigrationRecipientDocument stateDoc(cmd.getMigrationId(), + cmd.getDonorConnectionString().toString(), + cmd.getTenantId().toString(), + cmd.getReadPreference()); + + auto recipientService = + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(repl::TenantMigrationRecipientService:: + kTenantMigrationRecipientServiceName); + auto recipientInstance = repl::TenantMigrationRecipientService::Instance::getOrCreate( + opCtx, recipientService, stateDoc.toBSON()); + uassertStatusOK(recipientInstance->checkIfOptionsConflict(stateDoc)); + + auto returnAfterReachingTimestamp = cmd.getReturnAfterReachingTimestamp(); + if (!returnAfterReachingTimestamp) { + return Response(recipientInstance->waitUntilMigrationReachesConsistentState(opCtx)); + } + + return Response(recipientInstance->waitUntilTimestampIsMajorityCommitted( + opCtx, *returnAfterReachingTimestamp)); } void doCheckAuthorization(OperationContext* opCtx) const {} @@ -61,7 +88,8 @@ public: }; std::string help() const { - return "Instructs the recipient to sync data as part of a tenant migration."; + return "Internal replica set command; instructs the recipient to sync data as part of a " + "tenant migration."; } bool adminOnly() const override { diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl index 01a772c4c38..c43fdef58e1 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl @@ -31,6 +31,7 @@ global: cpp_includes: - "mongo/client/read_preference.h" - "mongo/db/repl/tenant_migration_util.h" + - "mongo/db/repl/optime.h" imports: - "mongo/client/read_preference_setting.idl" @@ -38,6 +39,15 @@ imports: - "mongo/s/sharding_types.idl" - "mongo/db/repl/replication_types.idl" +structs: + recipientSyncDataResponse: + description: "Response for the 'recipientSyncData' command" + strict: false + fields: + majorityAppliedDonorOpTime: + type: optime + description: "Majority applied donor optime by the recipient" + commands: recipientSyncData: description: "Parser for the 'recipientSyncData' command." @@ -48,20 +58,31 @@ commands: description: "Unique identifier for the tenant migration." type: uuid donorConnectionString: - description: "The URI string that the donor will utilize to create a connection with the recipient." + description: >- + The URI string that the recipient will utilize to create a connection with the + donor. type: string + validator: + callback: "validateConnectionString" tenantId: - description: "The prefix from which the migrating database will be matched. The prefixes 'admin', 'local', 'config', the empty string, are not allowed." + description: >- + The prefix from which the migrating database will be matched. The prefixes 'admin', + 'local', 'config', the empty string, are not allowed. type: string validator: callback: "validateDatabasePrefix" readPreference: - description: "The read preference settings that the donor will pass on to the recipient." + description: >- + The read preference settings that the donor will pass on to the recipient. type: readPreference - returnAfterReachingOpTime: - description: "If provided, the recipient should return after syncing up to this OpTime. Otherwise, the recipient will return once its copy of the data is consistent." + returnAfterReachingTimestamp: + description: >- + If provided, the recipient should return after syncing up to this timestamp. + Otherwise, the recipient will return once its copy of the data is consistent. type: timestamp optional: true + validator: + callback: "validateTimestampNotNull" recipientForgetMigration: description: "Parser for the 'recipientForgetMigration' command." diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h index 4ae7af8cdb9..5c19dbf8d02 100644 --- a/src/mongo/db/concurrency/d_concurrency.h +++ b/src/mongo/db/concurrency/d_concurrency.h @@ -81,8 +81,11 @@ public: : _rid(rid), _locker(locker), _result(LOCK_INVALID) {} ResourceLock(Locker* locker, ResourceId rid, LockMode mode) + : ResourceLock(nullptr, locker, rid, mode) {} + + ResourceLock(OperationContext* opCtx, Locker* locker, ResourceId rid, LockMode mode) : _rid(rid), _locker(locker), _result(LOCK_INVALID) { - lock(nullptr, mode); + lock(opCtx, mode); } ResourceLock(ResourceLock&& otherLock) @@ -165,7 +168,13 @@ public: class ExclusiveLock : public ResourceLock { public: ExclusiveLock(Locker* locker, ResourceMutex mutex) - : ResourceLock(locker, mutex.rid(), MODE_X) {} + : ExclusiveLock(nullptr, locker, mutex) {} + + /** + * Interruptible lock acquisition. + */ + ExclusiveLock(OperationContext* opCtx, Locker* locker, ResourceMutex mutex) + : ResourceLock(opCtx, locker, mutex.rid(), MODE_X) {} using ResourceLock::lock; @@ -185,8 +194,13 @@ public: */ class SharedLock : public ResourceLock { public: - SharedLock(Locker* locker, ResourceMutex mutex) - : ResourceLock(locker, mutex.rid(), MODE_IS) {} + SharedLock(Locker* locker, ResourceMutex mutex) : SharedLock(nullptr, locker, mutex) {} + + /** + * Interruptible lock acquisition. + */ + SharedLock(OperationContext* opCtx, Locker* locker, ResourceMutex mutex) + : ResourceLock(opCtx, locker, mutex.rid(), MODE_IS) {} }; /** diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 09993f84475..4730e429f1c 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -44,7 +44,6 @@ #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_request.h" -#include "mongo/db/ops/update_result.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" @@ -224,21 +223,21 @@ bool Helpers::getLast(OperationContext* opCtx, const char* ns, BSONObj& result) return false; } -void Helpers::upsert(OperationContext* opCtx, - const string& ns, - const BSONObj& o, - bool fromMigrate) { +UpdateResult Helpers::upsert(OperationContext* opCtx, + const string& ns, + const BSONObj& o, + bool fromMigrate) { BSONElement e = o["_id"]; verify(e.type()); BSONObj id = e.wrap(); - upsert(opCtx, ns, id, o, fromMigrate); + return upsert(opCtx, ns, id, o, fromMigrate); } -void Helpers::upsert(OperationContext* opCtx, - const string& ns, - const BSONObj& filter, - const BSONObj& updateMod, - bool fromMigrate) { +UpdateResult Helpers::upsert(OperationContext* opCtx, + const string& ns, + const BSONObj& filter, + const BSONObj& updateMod, + bool fromMigrate) { OldClientContext context(opCtx, ns); const NamespaceString requestNs(ns); @@ -251,7 +250,7 @@ void Helpers::upsert(OperationContext* opCtx, request.setFromMigration(fromMigrate); request.setYieldPolicy(PlanYieldPolicy::YieldPolicy::NO_YIELD); - ::mongo::update(opCtx, context.db(), request); + return ::mongo::update(opCtx, context.db(), request); } void Helpers::update(OperationContext* opCtx, diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h index 339f5165455..17b1494c1ae 100644 --- a/src/mongo/db/dbhelpers.h +++ b/src/mongo/db/dbhelpers.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/namespace_string.h" +#include "mongo/db/ops/update_result.h" #include "mongo/db/record_id.h" namespace mongo { @@ -117,14 +118,14 @@ struct Helpers { static void putSingleton(OperationContext* opCtx, const char* ns, BSONObj obj); /** - * you have to lock + * Callers are expected to hold the collection lock. * you do not have to have Context set * o has to have an _id field or will assert */ - static void upsert(OperationContext* opCtx, - const std::string& ns, - const BSONObj& o, - bool fromMigrate = false); + static UpdateResult upsert(OperationContext* opCtx, + const std::string& ns, + const BSONObj& o, + bool fromMigrate = false); /** * Performs an upsert of 'updateMod' if we don't match the given 'filter'. @@ -132,11 +133,11 @@ struct Helpers { * Note: Query yielding is turned off, so both read and writes are performed * on the same storage snapshot. */ - static void upsert(OperationContext* opCtx, - const std::string& ns, - const BSONObj& filter, - const BSONObj& updateMod, - bool fromMigrate = false); + static UpdateResult upsert(OperationContext* opCtx, + const std::string& ns, + const BSONObj& filter, + const BSONObj& updateMod, + bool fromMigrate = false); /** * Performs an update of 'updateMod' for the entry matching the given 'filter'. diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 03498cf54b9..c765b83033b 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1476,6 +1476,7 @@ env.CppUnitTest( 'tenant_oplog_batcher_test.cpp', 'vote_requester_test.cpp', 'wait_for_majority_service_test.cpp', + 'tenant_migration_recipient_entry_helpers_test.cpp', 'tenant_migration_recipient_service_test.cpp', ], LIBDEPS=[ @@ -1556,6 +1557,7 @@ env.CppUnitTest( 'task_executor_mock', 'task_runner', 'tenant_migration_recipient_service', + 'tenant_migration_recipient_utils', 'tenant_oplog_processing', 'wait_for_majority_service', ], diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 1478ad2cf9c..6ae9760ccbd 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -448,7 +448,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa donorConnString.toString(), _stateDoc.getTenantId().toString(), _stateDoc.getReadPreference()); - request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp()); + request.setReturnAfterReachingTimestamp(_stateDoc.getBlockTimestamp()); return request.toBSON(BSONObj()); }()); diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp index de656c59ca6..e5e0ff5c0c9 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp @@ -36,71 +36,55 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" -#include "mongo/db/index_build_entry_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_request.h" +#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/util/str.h" + namespace mongo { namespace repl { -namespace { -/** - * Creates the tenant migration recipients collection if it doesn't exist. - * Note: Throws WriteConflictException if the collection already exist. - */ -CollectionPtr ensureTenantMigrationRecipientsCollectionExists(OperationContext* opCtx, - Database* db, - const NamespaceString& nss) { - // Sanity checks. - invariant(db); - invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); - - auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); - if (!collection) { - WriteUnitOfWork wuow(opCtx); - - collection = db->createCollection(opCtx, nss, CollectionOptions()); - // Ensure the collection exists. - invariant(collection); - - wuow.commit(); - } - return collection; -} - -} // namespace namespace tenantMigrationRecipientEntryHelpers { Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) { const auto nss = NamespaceString::kTenantMigrationRecipientsNamespace; - return writeConflictRetry(opCtx, "insertStateDoc", nss.ns(), [&]() -> Status { - // TODO SERVER-50741: Should be replaced by AutoGetCollection. - AutoGetOrCreateDb db(opCtx, nss.db(), MODE_IX); - Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - - uassert(ErrorCodes::PrimarySteppedDown, - str::stream() << "No longer primary while attempting to insert tenant migration " - "recipient state document", - repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); - - // TODO SERVER-50741: ensureTenantMigrationRecipientsCollectionExists() should be removed - // and this should return ErrorCodes::NamespaceNotFound when collection is missing. - auto collection = ensureTenantMigrationRecipientsCollectionExists(opCtx, db.getDb(), nss); - - WriteUnitOfWork wuow(opCtx); - Status status = - collection->insertDocument(opCtx, InsertStatement(stateDoc.toBSON()), nullptr); - if (!status.isOK()) { - return status; - } - wuow.commit(); - return Status::OK(); - }); + AutoGetCollection collection(opCtx, nss, MODE_IX); + + // Sanity check + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "No longer primary while attempting to insert tenant migration " + "recipient state document", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); + + return writeConflictRetry( + opCtx, "insertTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> Status { + // Insert the 'stateDoc' if no active tenant migration found for the 'tenantId' provided + // in the 'stateDoc'. Tenant Migration is considered as active for a tenantId if a state + // document exists on the disk for that 'tenantId' and not marked to be garbage + // collected (i.e, 'expireAt' not set). + const auto filter = BSON(TenantMigrationRecipientDocument::kTenantIdFieldName + << stateDoc.getTenantId().toString() + << TenantMigrationRecipientDocument::kExpireAtFieldName + << BSON("$exists" << false)); + const auto updateMod = BSON("$setOnInsert" << stateDoc.toBSON()); + auto updateResult = + Helpers::upsert(opCtx, nss.ns(), filter, updateMod, /*fromMigrate=*/false); + + // '$setOnInsert' update operator can no way modify the existing on-disk state doc. + invariant(!updateResult.numDocsModified); + if (updateResult.upsertedId.isEmpty()) { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Failed to insert the state doc: " << stateDoc.toBSON() + << "; Active tenant migration found for tenantId: " + << stateDoc.getTenantId()}; + } + return Status::OK(); + }); } Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc) { @@ -111,18 +95,19 @@ Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDoc return Status(ErrorCodes::NamespaceNotFound, str::stream() << nss.ns() << " does not exist"); } - auto updateReq = UpdateRequest(); - updateReq.setNamespaceString(nss); - updateReq.setQuery(BSON("_id" << stateDoc.getId())); - updateReq.setUpdateModification( - write_ops::UpdateModification::parseFromClassicUpdate(stateDoc.toBSON())); - auto updateResult = update(opCtx, collection.getDb(), updateReq); - if (updateResult.numMatched == 0) { - return {ErrorCodes::NoSuchKey, - str::stream() << "Existing Tenant Migration State Document not found for id: " - << stateDoc.getId()}; - } - return Status::OK(); + + return writeConflictRetry( + opCtx, "updateTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> Status { + auto updateResult = + Helpers::upsert(opCtx, nss.ns(), stateDoc.toBSON(), /*fromMigrate=*/false); + if (updateResult.numMatched == 0) { + return {ErrorCodes::NoSuchKey, + str::stream() + << "Existing Tenant Migration State Document not found for id: " + << stateDoc.getId()}; + } + return Status::OK(); + }); } StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx, diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h index 3a6d88beb61..998af60a60b 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.h @@ -42,10 +42,19 @@ namespace repl { namespace tenantMigrationRecipientEntryHelpers { /** - * Writes the state doc to the disk. + * Inserts the tenant migration recipient state document 'stateDoc' into + * 'config.tenantMigrationRecipients' collection. Also, creates the collection if not present + * before inserting the document. * - * Returns 'DuplicateKey' error code if a document already exists on the disk with the same - * 'migrationUUID'. + * NOTE: A state doc might get inserted based on a decision made out of a stale read within a + * storage transaction. Callers are expected to have their own concurrency mechanism to handle + * write skew problem. + * + * @Returns 'ConflictingOperationInProgress' error code if an active tenant migration found for the + * tenantId provided in the 'stateDoc'. + * + * Throws 'DuplicateKey' error code if a document already exists on the disk with the same + * 'migrationUUID', irrespective of the document marked for garbage collect or not. */ Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDocument& stateDoc); diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp new file mode 100644 index 00000000000..cb1cb69d146 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers_test.cpp @@ -0,0 +1,187 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/service_context_d_test_fixture.h" + +namespace mongo { +namespace repl { + +using namespace tenantMigrationRecipientEntryHelpers; + +class TenantMigrationRecipientEntryHelpersTest : public ServiceContextMongoDTest { +public: + void setUp() override { + ServiceContextMongoDTest::setUp(); + auto serviceContext = getServiceContext(); + + auto opCtx = cc().makeOperationContext(); + ReplicationCoordinator::set(serviceContext, + std::make_unique<ReplicationCoordinatorMock>(serviceContext)); + StorageInterface::set(serviceContext, std::make_unique<StorageInterfaceImpl>()); + + repl::setOplogCollectionName(serviceContext); + repl::createOplog(opCtx.get()); + + // Step up the node. + long long term = 1; + auto replCoord = ReplicationCoordinator::get(getServiceContext()); + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(opCtx.get(), term)); + replCoord->setMyLastAppliedOpTimeAndWallTime( + OpTimeAndWallTime(OpTime(Timestamp(1, 1), term), Date_t())); + } + + void tearDown() override { + ServiceContextMongoDTest::tearDown(); + } + +protected: + bool checkStateDocPersisted(OperationContext* opCtx, + const TenantMigrationRecipientDocument& stateDoc) { + auto persistedStateDocWithStatus = getStateDoc(opCtx, stateDoc.getId()); + + auto status = persistedStateDocWithStatus.getStatus(); + if (status == ErrorCodes::NoMatchingDocument) { + return false; + } + ASSERT_OK(status); + + ASSERT_BSONOBJ_EQ(stateDoc.toBSON(), persistedStateDocWithStatus.getValue().toBSON()); + return true; + } +}; + +TEST_F(TenantMigrationRecipientEntryHelpersTest, AddTenantMigrationRecipientStateDoc) { + auto opCtx = cc().makeOperationContext(); + + const UUID migrationUUID = UUID::gen(); + TenantMigrationRecipientDocument activeTenantAStateDoc( + migrationUUID, + "DonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_OK(insertStateDoc(opCtx.get(), activeTenantAStateDoc)); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), activeTenantAStateDoc)); + + // Same migration uuid and same tenant id. + TenantMigrationRecipientDocument stateDoc1(migrationUUID, + "AnotherDonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + auto status = insertStateDoc(opCtx.get(), stateDoc1); + ASSERT_EQUALS(ErrorCodes::ConflictingOperationInProgress, status.code()); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), activeTenantAStateDoc)); + + // Same migration uuid and different tenant id. + TenantMigrationRecipientDocument stateDoc2(migrationUUID, + "DonorHost:12345", + "tenantB", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_THROWS_CODE( + insertStateDoc(opCtx.get(), stateDoc2), DBException, ErrorCodes::DuplicateKey); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), activeTenantAStateDoc)); + + // Different migration uuid and same tenant id. + TenantMigrationRecipientDocument stateDoc3(UUID::gen(), + "DonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + status = insertStateDoc(opCtx.get(), stateDoc3); + ASSERT_EQUALS(ErrorCodes::ConflictingOperationInProgress, status.code()); + ASSERT_FALSE(checkStateDocPersisted(opCtx.get(), stateDoc3)); + + // Different migration uuid and different tenant id. + TenantMigrationRecipientDocument stateDoc4(UUID::gen(), + "DonorHost:12345", + "tenantB", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_OK(insertStateDoc(opCtx.get(), stateDoc4)); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), stateDoc4)); +} + +TEST_F(TenantMigrationRecipientEntryHelpersTest, + AddTenantMigrationRecipientStateDoc_GarbageCollect) { + auto opCtx = cc().makeOperationContext(); + + const UUID migrationUUID = UUID::gen(); + TenantMigrationRecipientDocument inactiveTenantAStateDoc( + migrationUUID, + "DonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + inactiveTenantAStateDoc.setExpireAt(Date_t::now()); + ASSERT_OK(insertStateDoc(opCtx.get(), inactiveTenantAStateDoc)); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), inactiveTenantAStateDoc)); + + // Same migration uuid and same tenant id. + TenantMigrationRecipientDocument stateDoc1(migrationUUID, + "AnotherDonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_THROWS_CODE( + insertStateDoc(opCtx.get(), stateDoc1), DBException, ErrorCodes::DuplicateKey); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), inactiveTenantAStateDoc)); + + // Same migration uuid and different tenant id. + TenantMigrationRecipientDocument stateDoc2(migrationUUID, + "DonorHost:12345", + "tenantB", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_THROWS_CODE( + insertStateDoc(opCtx.get(), stateDoc2), DBException, ErrorCodes::DuplicateKey); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), inactiveTenantAStateDoc)); + + // Different migration uuid and same tenant id. + TenantMigrationRecipientDocument stateDoc3(UUID::gen(), + "DonorHost:12345", + "tenantA", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_OK(insertStateDoc(opCtx.get(), stateDoc3)); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), stateDoc3)); + + // Different migration uuid and different tenant id. + TenantMigrationRecipientDocument stateDoc4(UUID::gen(), + "DonorHost:12345", + "tenantC", + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + ASSERT_OK(insertStateDoc(opCtx.get(), stateDoc4)); + ASSERT_TRUE(checkStateDocPersisted(opCtx.get(), stateDoc4)); +} + +} // namespace repl +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 9b7842ac673..03901fc5fe8 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -163,11 +163,13 @@ ThreadPool::Limits TenantMigrationRecipientService::getThreadPoolLimits() const std::shared_ptr<PrimaryOnlyService::Instance> TenantMigrationRecipientService::constructInstance( BSONObj initialStateDoc) const { - return std::make_shared<TenantMigrationRecipientService::Instance>(initialStateDoc); + return std::make_shared<TenantMigrationRecipientService::Instance>(this, initialStateDoc); } -TenantMigrationRecipientService::Instance::Instance(BSONObj stateDoc) +TenantMigrationRecipientService::Instance::Instance( + const TenantMigrationRecipientService* recipientService, BSONObj stateDoc) : PrimaryOnlyService::TypedInstance<Instance>(), + _recipientService(recipientService), _stateDoc(TenantMigrationRecipientDocument::parse(IDLParserErrorContext("recipientStateDoc"), stateDoc)), _tenantId(_stateDoc.getTenantId().toString()), @@ -175,6 +177,25 @@ TenantMigrationRecipientService::Instance::Instance(BSONObj stateDoc) _donorConnectionString(_stateDoc.getDonorConnectionString().toString()), _readPreference(_stateDoc.getReadPreference()) {} +Status TenantMigrationRecipientService::Instance::checkIfOptionsConflict( + const TenantMigrationRecipientDocument& requestedStateDoc) const { + invariant(requestedStateDoc.getId() == _migrationUuid); + + if (requestedStateDoc.getTenantId() == _tenantId && + requestedStateDoc.getDonorConnectionString() == _donorConnectionString && + requestedStateDoc.getReadPreference().equals(_readPreference)) { + return Status::OK(); + } + + return Status(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Requested options for tenant migration doesn't match" + << " the active migration options, migrationId: " << _migrationUuid + << ", tenantId: " << _tenantId + << ", connectionString: " << _donorConnectionString + << ", readPreference: " << _readPreference.toString() + << ", requested options:" << requestedStateDoc.toBSON()); +} + OpTime TenantMigrationRecipientService::Instance::waitUntilMigrationReachesConsistentState( OperationContext* opCtx) const { return _dataConsistentPromise.getFuture().get(opCtx); @@ -314,7 +335,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( auto uniqueOpCtx = cc().makeOperationContext(); auto opCtx = uniqueOpCtx.get(); - LOGV2_DEBUG(5081400, 2, "Recipient migration service initializing state document", @@ -325,7 +345,11 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( // Persist the state doc before starting the data sync. _stateDoc.setState(TenantMigrationRecipientStateEnum::kStarted); - uassertStatusOK(tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc)); + { + Lock::ExclusiveLock stateDocInsertLock( + opCtx, opCtx->lockState(), _recipientService->_stateDocInsertMutex); + uassertStatusOK(tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc)); + } if (MONGO_unlikely(failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) { LOGV2(4878500, "Persisting state doc failed due to fail point enabled."); @@ -724,6 +748,13 @@ void TenantMigrationRecipientService::Instance::run( _scopedExecutor = executor; pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet(); + LOGV2(4879607, + "Starting tenant migration recipient instance: ", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "connectionString"_attr = _donorConnectionString, + "readPreference"_attr = _readPreference); + ExecutorFuture(**executor) .then([this] { stdx::lock_guard lk(_mutex); @@ -768,7 +799,7 @@ void TenantMigrationRecipientService::Instance::run( str::stream() << "Can't start the data sync as the state doc is already marked " "for garbage collect for migration uuid: " << getMigrationUUID(), - !_stateDoc.getGarbageCollect()); + !_stateDoc.getExpireAt()); _getStartOpTimesFromDonor(lk); auto opCtx = cc().makeOperationContext(); uassertStatusOK( diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index d8c80112d04..e0b460479be 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -75,7 +75,8 @@ public: class Instance final : public PrimaryOnlyService::TypedInstance<Instance> { public: - explicit Instance(BSONObj stateDoc); + explicit Instance(const TenantMigrationRecipientService* recipientService, + BSONObj stateDoc); void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept final; @@ -112,17 +113,24 @@ public: */ const std::string& getTenantId() const; + /** + * To be called on the instance returned by PrimaryOnlyService::getOrCreate(). Returns an + * error if the options this Instance was created with are incompatible with a request for + * an instance with the options given in 'options'. + */ + Status checkIfOptionsConflict(const TenantMigrationRecipientDocument& StateDoc) const; + /* * Blocks the thread until the tenant migration reaches consistent state in an interruptible * mode. Returns the donor optime at which the migration reached consistent state. Throws - * exceptions on error. + * exception on error. */ OpTime waitUntilMigrationReachesConsistentState(OperationContext* opCtx) const; /* * Blocks the thread until the tenant oplog applier applied data past the given 'donorTs' * in an interruptible mode. Returns the majority applied donor optime which may be greater - * or equal to given 'donorTs'. Throws throw exceptions on error. + * or equal to given 'donorTs'. Throws exception on error. */ OpTime waitUntilTimestampIsMajorityCommitted(OperationContext* opCtx, const Timestamp& donorTs) const; @@ -341,8 +349,9 @@ public: // (M) Reads and writes guarded by _mutex. // (W) Synchronization required only for writes. - std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M) - TenantMigrationRecipientDocument _stateDoc; // (M) + const TenantMigrationRecipientService* const _recipientService; // (R) (not owned) + std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M) + TenantMigrationRecipientDocument _stateDoc; // (M) // This data is provided in the initial state doc and never changes. We keep copies to // avoid having to obtain the mutex to access them. @@ -385,6 +394,16 @@ public: // Promise that is resolved Signaled when the tenant data sync has reached consistent point. SharedPromise<OpTime> _dataConsistentPromise; // (W) }; + +private: + /* + * Ensures that only one Instance is able to insert the initial state doc provided by the user, + * into NamespaceString::kTenantMigrationRecipientsNamespace collection at a time. + * + * No other locks should be held when locking this. RSTl/global/db/collection locks have to be + * taken after taking this. + */ + Lock::ResourceMutex _stateDocInsertMutex{"TenantMigrationRecipientStateDocInsert::mutex"}; }; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 6e9c4a2cd6a..a23047231bf 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -132,12 +132,12 @@ structs: type: TenantMigrationRecipientState description: "The state of the tenant migration." default: kUninitialized - garbageCollect: - type: bool + expireAt: + type: date description: >- - A boolean that determines whether the state machine should be deleted - after a delay via the TTL monitor. - default: false + The wall-clock time at which the state machine document should be + removed by the TTL monitor. + optional: true startApplyingOpTime: description: >- Populated during data sync; the donor's operation time when the data diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index 04dc44fec88..84b0a1c6c8a 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -31,6 +31,9 @@ #include <set> #include "mongo/base/status.h" +#include "mongo/bson/timestamp.h" +#include "mongo/client/mongo_uri.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/util/str.h" namespace mongo { @@ -48,6 +51,37 @@ inline Status validateDatabasePrefix(const std::string& tenantId) { return isPrefixSupported ? Status::OK() : Status(ErrorCodes::BadValue, - str::stream() << "cannot migrate databases with prefix \'" << tenantId << "'"); + str::stream() << "cannot migrate databases for tenant \'" << tenantId << "'"); } + +inline Status validateTimestampNotNull(const Timestamp& ts) { + return (!ts.isNull()) + ? Status::OK() + : Status(ErrorCodes::BadValue, str::stream() << "Timestamp can't be null"); +} + +inline Status validateConnectionString(const StringData& donorOrRecipientConnectionString) { + // Sanity check to make sure that donor and recipient do not share any of the same hosts + // for tenant migration. + const auto donorOrRecipientServers = + uassertStatusOK(MongoURI::parse(donorOrRecipientConnectionString.toString())).getServers(); + + const auto servers = repl::ReplicationCoordinator::get(cc().getServiceContext()) + ->getConfig() + .getConnectionString() + .getServers(); + + for (auto&& server : servers) { + bool foundMatch = std::any_of( + donorOrRecipientServers.begin(), + donorOrRecipientServers.end(), + [&](const HostAndPort& donorOrRecipient) { return server == donorOrRecipient; }); + if (foundMatch) { + return Status(ErrorCodes::BadValue, + str::stream() << "Donor and recipient hosts must be different."); + } + } + return Status::OK(); +} + } // namespace mongo |