From 3626a65b1d1f8ff2d230704146a09595f78bfe51 Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Tue, 1 Sep 2020 19:22:18 +0000 Subject: SERVER-49204 Implement donorForgetMigration command --- .../libs/override_methods/inject_tenant_prefix.js | 6 +- .../tenant_migration_donor_state_machine.js | 156 +++++++++++++-------- src/mongo/base/error_codes.yml | 2 + .../db/commands/tenant_migration_donor_cmds.cpp | 17 ++- .../commands/tenant_migration_recipient_cmds.idl | 2 +- src/mongo/db/op_observer_impl.cpp | 2 +- src/mongo/db/repl/SConscript | 1 + src/mongo/db/repl/repl_server_parameters.idl | 13 +- .../db/repl/tenant_migration_donor_service.cpp | 127 +++++++++++++---- src/mongo/db/repl/tenant_migration_donor_service.h | 38 +++++ src/mongo/db/repl/tenant_migration_donor_util.cpp | 49 ++++--- src/mongo/db/repl/tenant_migration_donor_util.h | 5 +- .../db/repl/tenant_migration_state_machine.idl | 8 +- 13 files changed, 304 insertions(+), 122 deletions(-) diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index 0f87ca1af4c..658c8affb04 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -30,7 +30,7 @@ function prependDbPrefixToDbNameIfApplicable(dbName) { // ignored. return dbName; } - return isBlacklistedDb(dbName) ? dbName : TestData.tenantMigrationDbPrefix + "_" + dbName; + return isBlacklistedDb(dbName) ? dbName : TestData.dbPrefix + "_" + dbName; } /** @@ -51,7 +51,7 @@ function prependDbPrefixToNsIfApplicable(ns) { * If the given database name starts TestData.dbPrefix, removes the prefix. */ function extractOriginalDbName(dbName) { - return dbName.replace(TestData.tenantMigrationDbPrefix + "_", ""); + return dbName.replace(TestData.dbPrefix + "_", ""); } /** @@ -67,7 +67,7 @@ function extractOriginalNs(ns) { * Removes all occurrences of TestDatabase.dbPrefix in the string. */ function removeDbPrefixFromString(string) { - return string.replace(new RegExp(TestData.tenantMigrationDbPrefix + "_", "g"), ""); + return string.replace(new RegExp(TestData.dbPrefix + "_", "g"), ""); } /** diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js index ceed0fe47a8..7f21efef1c6 100644 --- a/jstests/replsets/tenant_migration_donor_state_machine.js +++ b/jstests/replsets/tenant_migration_donor_state_machine.js @@ -1,6 +1,6 @@ /** - * Tests the TenantMigrationAccessBlocker and donor state document are updated correctly after - * the donorStartMigration command is run. + * Tests the TenantMigrationAccessBlocker and donor state document are updated correctly at each + * stage of the migration, and are eventually removed after the donorForgetMigration has returned. * * Tenant migrations are not expected to be run on servers with ephemeralForTest, and in particular * this test fails on ephemeralForTest because the donor has to wait for the write to set the @@ -15,6 +15,41 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); + +let expectedNumRecipientSyncDataCmdSent = 0; +let expectedNumRecipientForgetMigrationCmdSent = 0; + +/** + * Runs the donorForgetMigration command and asserts that the TenantMigrationAccessBlocker and donor + * state document are eventually removed from the donor. + */ +function testDonorForgetMigration(donorRst, recipientRst, migrationId, dbPrefix) { + jsTest.log("Test donorForgetMigration after the migration completes"); + const donorPrimary = donorRst.getPrimary(); + const recipientPrimary = recipientRst.getPrimary(); + + assert.commandWorked( + donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId})); + + expectedNumRecipientForgetMigrationCmdSent++; + const recipientForgetMigrationMetrics = + recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientForgetMigration; + assert.eq(recipientForgetMigrationMetrics.failed, 0); + assert.eq(recipientForgetMigrationMetrics.total, expectedNumRecipientForgetMigrationCmdSent); + + donorRst.nodes.forEach((node) => { + assert.soon(() => + null == node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker); + }); + + assert.soon( + () => donorPrimary.getCollection(kConfigDonorsNS).count({databasePrefix: dbPrefix}) === 0); + + const donorRecipientMonitorPoolStats = + donorPrimary.adminCommand({connPoolStats: 1}).replicaSets; + assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0); +} // An object that mirrors the access states for the TenantMigrationAccessBlocker. const accessState = { @@ -24,54 +59,58 @@ const accessState = { kReject: 3 }; +// Use a shorter delay since the default delay is very large. +const kGarbageCollectionDelayMS = 3 * 1000; + const donorRst = new ReplSetTest({ nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], - name: 'donor', - nodeOptions: {setParameter: {enableTenantMigrations: true}} + name: "donor", + nodeOptions: { + setParameter: { + enableTenantMigrations: true, + tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS + } + } }); const recipientRst = new ReplSetTest( - {nodes: 1, name: 'recipient', nodeOptions: {setParameter: {enableTenantMigrations: true}}}); + {nodes: 1, name: "recipient", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const kDBPrefix = 'testDb'; -const kConfigDonorsNS = "config.tenantMigrationDonors"; +donorRst.startSet(); +donorRst.initiate(); -let donorPrimary; -let recipientPrimary; -let kRecipientConnString; +recipientRst.startSet(); +recipientRst.initiate(); -const setup = () => { - donorRst.startSet(); - donorRst.initiate(); - recipientRst.startSet(); - recipientRst.initiate(); +const donorPrimary = donorRst.getPrimary(); +const recipientPrimary = recipientRst.getPrimary(); +const kRecipientConnString = recipientRst.getURL(); - donorPrimary = donorRst.getPrimary(); - recipientPrimary = recipientRst.getPrimary(); - kRecipientConnString = recipientRst.getURL(); -}; -const tearDown = () => { - donorRst.stopSet(); - recipientRst.stopSet(); -}; +const kDBPrefix = "testDb"; +const kConfigDonorsNS = "config.tenantMigrationDonors"; + +let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); +configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0}); (() => { - // Test the case where the migration commits. - setup(); - const dbName = kDBPrefix + "Commit"; + jsTest.log("Test the case where the migration commits"); + const migrationId = UUID(); - function startMigration(host, recipientConnString, dbName) { + function startMigration(host, recipientConnString, dbPrefix, migrationIdString) { const primary = new Mongo(host); assert.commandWorked(primary.adminCommand({ donorStartMigration: 1, - migrationId: UUID(), + migrationId: UUID(migrationIdString), recipientConnectionString: recipientConnString, - databasePrefix: dbName, + databasePrefix: dbPrefix, readPreference: {mode: "primary"} })); } - let migrationThread = - new Thread(startMigration, donorPrimary.host, kRecipientConnString, dbName); + let migrationThread = new Thread(startMigration, + donorPrimary.host, + kRecipientConnString, + kDBPrefix, + extractUUIDFromObject(migrationId)); let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); migrationThread.start(); @@ -79,12 +118,12 @@ const tearDown = () => { blockingFp.wait(); let mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtab[dbName].access, accessState.kBlockingReadsAndWrites); - assert(mtab[dbName].blockTimestamp); + assert.eq(mtab[kDBPrefix].access, accessState.kBlockingReadsAndWrites); + assert(mtab[kDBPrefix].blockTimestamp); - let donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + let donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix}); let blockOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne( - {ns: kConfigDonorsNS, op: "u", "o.databasePrefix": dbName}); + {ns: kConfigDonorsNS, op: "u", "o.databasePrefix": kDBPrefix}); assert.eq(donorDoc.state, "blocking"); assert.eq(donorDoc.blockTimestamp, blockOplogEntry.ts); @@ -93,60 +132,61 @@ const tearDown = () => { migrationThread.join(); mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtab[dbName].access, accessState.kReject); - assert(mtab[dbName].commitOrAbortOpTime); + assert.eq(mtab[kDBPrefix].access, accessState.kReject); + assert(mtab[kDBPrefix].commitOrAbortOpTime); - donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix}); let commitOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); assert.eq(donorDoc.state, "committed"); assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts); - const donorRecipientMonitorPoolStats = - donorPrimary.adminCommand({connPoolStats: 1}).replicaSets; - assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0); - + expectedNumRecipientSyncDataCmdSent += 2; const recipientSyncDataMetrics = recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData; assert.eq(recipientSyncDataMetrics.failed, 0); - assert.eq(recipientSyncDataMetrics.total, 2); - tearDown(); + assert.eq(recipientSyncDataMetrics.total, expectedNumRecipientSyncDataCmdSent); + + testDonorForgetMigration(donorRst, recipientRst, migrationId, kDBPrefix); })(); (() => { - // Test the case where the migration aborts. - setup(); - const dbName = kDBPrefix + "Abort"; + jsTest.log("Test the case where the migration aborts"); + const migrationId = UUID(); + + let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0}); let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts"); assert.commandFailedWithCode(donorPrimary.adminCommand({ donorStartMigration: 1, - migrationId: UUID(), + migrationId: migrationId, recipientConnectionString: kRecipientConnString, - databasePrefix: dbName, + databasePrefix: kDBPrefix, readPreference: {mode: "primary"} }), ErrorCodes.TenantMigrationAborted); abortFp.off(); const mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtab[dbName].access, accessState.kAllow); - assert(!mtab[dbName].commitOrAbortOpTime); + assert.eq(mtab[kDBPrefix].access, accessState.kAllow); + assert(!mtab[kDBPrefix].commitOrAbortOpTime); - const donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + const donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix}); const abortOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); assert.eq(donorDoc.state, "aborted"); assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts); - const donorRecipientMonitorPoolStats = - donorPrimary.adminCommand({connPoolStats: 1}).replicaSets; - assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0); - + expectedNumRecipientSyncDataCmdSent += 2; const recipientSyncDataMetrics = recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData; assert.eq(recipientSyncDataMetrics.failed, 0); - assert.eq(recipientSyncDataMetrics.total, 2); - tearDown(); + assert.eq(recipientSyncDataMetrics.total, expectedNumRecipientSyncDataCmdSent); + + testDonorForgetMigration(donorRst, recipientRst, migrationId, kDBPrefix); })(); + +donorRst.stopSet(); +recipientRst.stopSet(); })(); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 8c355271195..f788660569e 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -399,6 +399,8 @@ error_codes: - {code: 326, name: OplogQueryMinTsMissing} + - {code: 327, name: NoSuchTenantMigration} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp index e8627694a63..5478df86f57 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp @@ -88,7 +88,7 @@ public: TenantMigrationDonorService::Instance::getOrCreate(donorService, donorStateDoc); uassertStatusOK(donor->checkIfOptionsConflict(donorStateDoc)); - donor->getCompletionFuture().get(); + donor->getDecisionFuture().get(); } void doCheckAuthorization(OperationContext* opCtx) const {} @@ -168,6 +168,21 @@ public: uassert(ErrorCodes::CommandNotSupported, "donorForgetMigration command not enabled", repl::enableTenantMigrations); + + const RequestType& requestBody = request(); + + auto donorService = + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(TenantMigrationDonorService::kServiceName); + auto donor = TenantMigrationDonorService::Instance::lookup( + donorService, BSON("_id" << requestBody.getMigrationId())); + uassert(ErrorCodes::NoSuchTenantMigration, + str::stream() << "Could not find tenant migration with id " + << requestBody.getMigrationId(), + donor); + + donor.get().get()->onReceiveDonorForgetMigration(); + donor.get().get()->getCompletionFuture().get(); } private: diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl index 29f5be9b3f4..27b0c745073 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl @@ -70,4 +70,4 @@ commands: fields: migrationId: description: "Unique identifier for the tenant migration." - type: uuid \ No newline at end of file + type: uuid diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 85fe5eca421..c13f444e191 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -574,7 +574,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc); } else if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace) { - tenant_migration_donor::onDonorStateTransition(opCtx, args.updateArgs.updatedDoc); + tenant_migration_donor::onDonorStateDocUpdate(opCtx, args.updateArgs.updatedDoc); } } diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 63a2626d1b3..d1fd2714f33 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1281,6 +1281,7 @@ env.Library( ], LIBDEPS=[ 'primary_only_service', + 'repl_server_parameters', 'tenant_migration_donor', 'wait_for_majority_service', ], diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 562d80ad126..4153ad4ef0e 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -330,4 +330,15 @@ server_parameters: set_at: startup cpp_vartype: bool cpp_varname: enableTenantMigrations - default: false \ No newline at end of file + default: false + + tenantMigrationGarbageCollectionDelayMS: + description: >- + The amount of time in milliseconds that the donor or recipient should wait before + removing the migration state document after receiving donorForgetMigration or + recipientForgetMigration. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord + cpp_varname: tenantMigrationGarbageCollectionDelayMS + default: + expr: 48 * 60 * 60 * 1000 diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 8f3fb868e7a..d821c6c3f26 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -38,6 +38,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/tenant_migration_access_blocker.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/repl/tenant_migration_donor_util.h" @@ -173,6 +174,34 @@ repl::OpTime TenantMigrationDonorService::Instance::_updateStateDocument( return updateOpTime.get(); } +repl::OpTime TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable() { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + DBDirectClient dbClient(opCtx); + + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); + + auto commandResponse = dbClient.runCommand([&] { + write_ops::Update updateOp(_stateDocumentsNS); + auto updateModification = + write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON()); + write_ops::UpdateOpEntry updateEntry( + BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()), + updateModification); + updateEntry.setMulti(false); + updateEntry.setUpsert(false); + updateOp.setUpdates({updateEntry}); + + return updateOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); +} + ExecutorFuture TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern( const std::shared_ptr& executor, repl::OpTime opTime) { return WaitForMajorityService::get(_serviceContext) @@ -180,27 +209,11 @@ ExecutorFuture TenantMigrationDonorService::Instance::_waitForMajorityWrit .thenRunOn(**executor); } -ExecutorFuture TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( +ExecutorFuture TenantMigrationDonorService::Instance::_sendCommandToRecipient( + OperationContext* opCtx, const std::shared_ptr& executor, - RemoteCommandTargeter* recipientTargeter) { - if (skipSendingRecipientSyncDataCommand.shouldFail()) { - return ExecutorFuture(**executor, Status::OK()); - } - - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - BSONObj cmdObj = BSONObj([&]() { - auto donorConnString = - repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); - RecipientSyncData request(_stateDoc.getId(), - donorConnString.toString(), - _stateDoc.getDatabasePrefix().toString(), - _stateDoc.getReadPreference()); - request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp()); - return request.toBSON(BSONObj()); - }()); - + RemoteCommandTargeter* recipientTargeter, + const BSONObj& cmdObj) { HostAndPort recipientHost = uassertStatusOK(recipientTargeter->findHost(opCtx, ReadPreferenceSetting())); @@ -238,6 +251,42 @@ ExecutorFuture TenantMigrationDonorService::Instance::_sendRecipientSyncDa }); } +ExecutorFuture TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( + const std::shared_ptr& executor, + RemoteCommandTargeter* recipientTargeter) { + if (skipSendingRecipientSyncDataCommand.shouldFail()) { + return ExecutorFuture(**executor, Status::OK()); + } + + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + BSONObj cmdObj = BSONObj([&]() { + auto donorConnString = + repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(); + RecipientSyncData request(_stateDoc.getId(), + donorConnString.toString(), + _stateDoc.getDatabasePrefix().toString(), + _stateDoc.getReadPreference()); + request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp()); + return request.toBSON(BSONObj()); + }()); + + return _sendCommandToRecipient(opCtx, executor, recipientTargeter, cmdObj); +} + +ExecutorFuture TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand( + const std::shared_ptr& executor, + RemoteCommandTargeter* recipientTargeter) { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + return _sendCommandToRecipient(opCtx, + executor, + recipientTargeter, + RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj())); +} + SemiFuture TenantMigrationDonorService::Instance::run( std::shared_ptr executor) noexcept { auto recipientUri = @@ -298,15 +347,6 @@ SemiFuture TenantMigrationDonorService::Instance::run( // Wait for the migration to commit or abort. return _mtab->onCompletion(); }) - .onError([this](Status status) { - if (!status.isOK() && _abortReason) { - status.addContext(str::stream() - << "Tenant migration with id \"" << _stateDoc.getId() - << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix() - << "\" aborted due to " << _abortReason); - } - return status; - }) .onCompletion([this](Status status) { LOGV2(5006601, "Tenant migration completed", @@ -314,6 +354,35 @@ SemiFuture TenantMigrationDonorService::Instance::run( "dbPrefix"_attr = _stateDoc.getDatabasePrefix(), "status"_attr = status, "abortReason"_attr = _abortReason); + + if (status.isOK()) { + _decisionPromise.emplaceValue(); + } else { + if (_abortReason) { + status.addContext(str::stream() + << "Tenant migration with id \"" << _stateDoc.getId() + << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix() + << "\" aborted due to " << _abortReason); + } + _decisionPromise.setError(status); + } + }) + .then([this, executor] { + // Wait for the donorForgetMigration command. + return _receivedDonorForgetMigrationPromise.getFuture(); + }) + .then([this, executor] { + const auto opTime = _markStateDocumentAsGarbageCollectable(); + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }) + .then([this, executor, recipientTargeter] { + return _sendRecipientForgetMigrationCommand(executor, recipientTargeter.get()); + }) + .onCompletion([this, executor](Status status) { + LOGV2(4920400, + "Marked migration state as garbage collectable", + "migrationId"_attr = _stateDoc.getId(), + "expireAt"_attr = _stateDoc.getExpireAt()); return status; }) .semi(); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 30fe3a41461..ddf178121e4 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -80,6 +80,17 @@ public: */ Status checkIfOptionsConflict(BSONObj options); + /** + * Returns a Future that will be resolved when the migration has committed or aborted. + */ + SharedSemiFuture getDecisionFuture() const { + return _decisionPromise.getFuture(); + } + + void onReceiveDonorForgetMigration() { + _receivedDonorForgetMigrationPromise.emplaceValue(); + } + private: const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationDonorsNamespace; @@ -97,12 +108,26 @@ public: */ repl::OpTime _updateStateDocument(const TenantMigrationDonorStateEnum nextState); + /** + * Sets the "expireAt" time for the state document to be garbage collected. + */ + repl::OpTime _markStateDocumentAsGarbageCollectable(); + /** * Waits for given opTime to be majority committed. */ ExecutorFuture _waitForMajorityWriteConcern( const std::shared_ptr& executor, repl::OpTime opTime); + /** + * Sends the given command to the recipient replica set. + */ + ExecutorFuture _sendCommandToRecipient( + OperationContext* opCtx, + const std::shared_ptr& executor, + RemoteCommandTargeter* recipientTargeter, + const BSONObj& cmdObj); + /** * Sends the recipientSyncData command to the recipient replica set. */ @@ -110,12 +135,25 @@ public: const std::shared_ptr& executor, RemoteCommandTargeter* recipientTargeter); + /** + * Sends the recipientForgetMigration command to the recipient replica set. + */ + ExecutorFuture _sendRecipientForgetMigrationCommand( + const std::shared_ptr& executor, + RemoteCommandTargeter* recipientTargeter); + ServiceContext* _serviceContext; TenantMigrationDonorDocument _stateDoc; std::shared_ptr _mtab; boost::optional _abortReason; + + // Promise that is resolved when the donor has majority-committed the migration decision. + SharedPromise _decisionPromise; + + // Promise that is resolved when the donor receives the donorForgetMigration command. + SharedPromise _receivedDonorForgetMigrationPromise; }; private: diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index 296e041658d..edae74ab4ff 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -58,7 +58,8 @@ const char kNetName[] = "TenantMigrationWorkerNetwork"; * Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking * state. */ -void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) { +void onTransitionToBlocking(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kBlocking); invariant(donorStateDoc.getBlockTimestamp()); @@ -91,7 +92,8 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen /** * Transitions the TenantMigrationAccessBlocker to the committed state. */ -void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) { +void onTransitionToCommitted(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kCommitted); invariant(donorStateDoc.getCommitOrAbortOpTime()); @@ -105,7 +107,8 @@ void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocume /** * Transitions the TenantMigrationAccessBlocker to the aborted state. */ -void onTransitionToAborted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) { +void onTransitionToAborted(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted); invariant(donorStateDoc.getCommitOrAbortOpTime()); @@ -130,24 +133,28 @@ std::unique_ptr makeTenantMigrationExecutor( executor::makeNetworkInterface(kNetName, nullptr, nullptr)); } -void onDonorStateTransition(OperationContext* opCtx, const BSONObj& donorStateDoc) { - auto parsedDonorStateDoc = - TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), donorStateDoc); - - switch (parsedDonorStateDoc.getState()) { - case TenantMigrationDonorStateEnum::kDataSync: - break; - case TenantMigrationDonorStateEnum::kBlocking: - onTransitionToBlocking(opCtx, parsedDonorStateDoc); - break; - case TenantMigrationDonorStateEnum::kCommitted: - onTransitionToCommitted(opCtx, parsedDonorStateDoc); - break; - case TenantMigrationDonorStateEnum::kAborted: - onTransitionToAborted(opCtx, parsedDonorStateDoc); - break; - default: - MONGO_UNREACHABLE; +void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson) { + auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), + donorStateDocBson); + if (donorStateDoc.getExpireAt()) { + TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .remove(donorStateDoc.getDatabasePrefix()); + } else { + switch (donorStateDoc.getState()) { + case TenantMigrationDonorStateEnum::kDataSync: + break; + case TenantMigrationDonorStateEnum::kBlocking: + onTransitionToBlocking(opCtx, donorStateDoc); + break; + case TenantMigrationDonorStateEnum::kCommitted: + onTransitionToCommitted(opCtx, donorStateDoc); + break; + case TenantMigrationDonorStateEnum::kAborted: + onTransitionToAborted(opCtx, donorStateDoc); + break; + default: + MONGO_UNREACHABLE; + } } } diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 5b1d9ef4168..4d3376fe170 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -49,10 +49,9 @@ namespace tenant_migration_donor { std::unique_ptr makeTenantMigrationExecutor(ServiceContext* serviceContext); /** - * Updates the TenantMigrationAccessBlocker for the tenant migration represented by the given - * config.migrationDonors document. + * Updates the donor's in-memory migration state to reflect the given persisted state. */ -void onDonorStateTransition(OperationContext* opCtx, const BSONObj& doc); +void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson); /** * If the operation has read concern "snapshot" or includes afterClusterTime, and the database is diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 603c87e1bc7..e29354bac78 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -78,10 +78,10 @@ structs: description: "The opTime at which the donor's state document was set to 'committed' or 'aborted'." optional: true - garbageCollect: - type: bool - description: "A boolean that determines whether the state machine should be deleted after a delay via the TTL monitor." - default: false + expireAt: + type: date + description: "The wall-clock time at which the state machine document should be removed by the TTL monitor." + optional: true tenantMigrationRecipientDocument: description: "Represents an in-progress tenant migration on the migration recipient." -- cgit v1.2.1