summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-09-01 19:22:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-09 02:11:58 +0000
commit3626a65b1d1f8ff2d230704146a09595f78bfe51 (patch)
treec9e6a45831bb3c29ff7d866cb0405c9e73482f78
parente21416d2889b54830624d812a48b2a5e07f4e47a (diff)
downloadmongo-3626a65b1d1f8ff2d230704146a09595f78bfe51.tar.gz
SERVER-49204 Implement donorForgetMigration command
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js6
-rw-r--r--jstests/replsets/tenant_migration_donor_state_machine.js156
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.cpp17
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.idl2
-rw-r--r--src/mongo/db/op_observer_impl.cpp2
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl13
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp127
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h38
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp49
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl8
13 files changed, 304 insertions, 122 deletions
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index 0f87ca1af4c..658c8affb04 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -30,7 +30,7 @@ function prependDbPrefixToDbNameIfApplicable(dbName) {
// ignored.
return dbName;
}
- return isBlacklistedDb(dbName) ? dbName : TestData.tenantMigrationDbPrefix + "_" + dbName;
+ return isBlacklistedDb(dbName) ? dbName : TestData.dbPrefix + "_" + dbName;
}
/**
@@ -51,7 +51,7 @@ function prependDbPrefixToNsIfApplicable(ns) {
* If the given database name starts TestData.dbPrefix, removes the prefix.
*/
function extractOriginalDbName(dbName) {
- return dbName.replace(TestData.tenantMigrationDbPrefix + "_", "");
+ return dbName.replace(TestData.dbPrefix + "_", "");
}
/**
@@ -67,7 +67,7 @@ function extractOriginalNs(ns) {
* Removes all occurrences of TestDatabase.dbPrefix in the string.
*/
function removeDbPrefixFromString(string) {
- return string.replace(new RegExp(TestData.tenantMigrationDbPrefix + "_", "g"), "");
+ return string.replace(new RegExp(TestData.dbPrefix + "_", "g"), "");
}
/**
diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js
index ceed0fe47a8..7f21efef1c6 100644
--- a/jstests/replsets/tenant_migration_donor_state_machine.js
+++ b/jstests/replsets/tenant_migration_donor_state_machine.js
@@ -1,6 +1,6 @@
/**
- * Tests the TenantMigrationAccessBlocker and donor state document are updated correctly after
- * the donorStartMigration command is run.
+ * Tests the TenantMigrationAccessBlocker and donor state document are updated correctly at each
+ * stage of the migration, and are eventually removed after the donorForgetMigration has returned.
*
* Tenant migrations are not expected to be run on servers with ephemeralForTest, and in particular
* this test fails on ephemeralForTest because the donor has to wait for the write to set the
@@ -15,6 +15,41 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
+load("jstests/libs/uuid_util.js");
+
+let expectedNumRecipientSyncDataCmdSent = 0;
+let expectedNumRecipientForgetMigrationCmdSent = 0;
+
+/**
+ * Runs the donorForgetMigration command and asserts that the TenantMigrationAccessBlocker and donor
+ * state document are eventually removed from the donor.
+ */
+function testDonorForgetMigration(donorRst, recipientRst, migrationId, dbPrefix) {
+ jsTest.log("Test donorForgetMigration after the migration completes");
+ const donorPrimary = donorRst.getPrimary();
+ const recipientPrimary = recipientRst.getPrimary();
+
+ assert.commandWorked(
+ donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}));
+
+ expectedNumRecipientForgetMigrationCmdSent++;
+ const recipientForgetMigrationMetrics =
+ recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientForgetMigration;
+ assert.eq(recipientForgetMigrationMetrics.failed, 0);
+ assert.eq(recipientForgetMigrationMetrics.total, expectedNumRecipientForgetMigrationCmdSent);
+
+ donorRst.nodes.forEach((node) => {
+ assert.soon(() =>
+ null == node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker);
+ });
+
+ assert.soon(
+ () => donorPrimary.getCollection(kConfigDonorsNS).count({databasePrefix: dbPrefix}) === 0);
+
+ const donorRecipientMonitorPoolStats =
+ donorPrimary.adminCommand({connPoolStats: 1}).replicaSets;
+ assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0);
+}
// An object that mirrors the access states for the TenantMigrationAccessBlocker.
const accessState = {
@@ -24,54 +59,58 @@ const accessState = {
kReject: 3
};
+// Use a shorter delay since the default delay is very large.
+const kGarbageCollectionDelayMS = 3 * 1000;
+
const donorRst = new ReplSetTest({
nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
- name: 'donor',
- nodeOptions: {setParameter: {enableTenantMigrations: true}}
+ name: "donor",
+ nodeOptions: {
+ setParameter: {
+ enableTenantMigrations: true,
+ tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS
+ }
+ }
});
const recipientRst = new ReplSetTest(
- {nodes: 1, name: 'recipient', nodeOptions: {setParameter: {enableTenantMigrations: true}}});
+ {nodes: 1, name: "recipient", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const kDBPrefix = 'testDb';
-const kConfigDonorsNS = "config.tenantMigrationDonors";
+donorRst.startSet();
+donorRst.initiate();
-let donorPrimary;
-let recipientPrimary;
-let kRecipientConnString;
+recipientRst.startSet();
+recipientRst.initiate();
-const setup = () => {
- donorRst.startSet();
- donorRst.initiate();
- recipientRst.startSet();
- recipientRst.initiate();
+const donorPrimary = donorRst.getPrimary();
+const recipientPrimary = recipientRst.getPrimary();
+const kRecipientConnString = recipientRst.getURL();
- donorPrimary = donorRst.getPrimary();
- recipientPrimary = recipientRst.getPrimary();
- kRecipientConnString = recipientRst.getURL();
-};
-const tearDown = () => {
- donorRst.stopSet();
- recipientRst.stopSet();
-};
+const kDBPrefix = "testDb";
+const kConfigDonorsNS = "config.tenantMigrationDonors";
+
+let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0});
(() => {
- // Test the case where the migration commits.
- setup();
- const dbName = kDBPrefix + "Commit";
+ jsTest.log("Test the case where the migration commits");
+ const migrationId = UUID();
- function startMigration(host, recipientConnString, dbName) {
+ function startMigration(host, recipientConnString, dbPrefix, migrationIdString) {
const primary = new Mongo(host);
assert.commandWorked(primary.adminCommand({
donorStartMigration: 1,
- migrationId: UUID(),
+ migrationId: UUID(migrationIdString),
recipientConnectionString: recipientConnString,
- databasePrefix: dbName,
+ databasePrefix: dbPrefix,
readPreference: {mode: "primary"}
}));
}
- let migrationThread =
- new Thread(startMigration, donorPrimary.host, kRecipientConnString, dbName);
+ let migrationThread = new Thread(startMigration,
+ donorPrimary.host,
+ kRecipientConnString,
+ kDBPrefix,
+ extractUUIDFromObject(migrationId));
let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
migrationThread.start();
@@ -79,12 +118,12 @@ const tearDown = () => {
blockingFp.wait();
let mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtab[dbName].access, accessState.kBlockingReadsAndWrites);
- assert(mtab[dbName].blockTimestamp);
+ assert.eq(mtab[kDBPrefix].access, accessState.kBlockingReadsAndWrites);
+ assert(mtab[kDBPrefix].blockTimestamp);
- let donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ let donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix});
let blockOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne(
- {ns: kConfigDonorsNS, op: "u", "o.databasePrefix": dbName});
+ {ns: kConfigDonorsNS, op: "u", "o.databasePrefix": kDBPrefix});
assert.eq(donorDoc.state, "blocking");
assert.eq(donorDoc.blockTimestamp, blockOplogEntry.ts);
@@ -93,60 +132,61 @@ const tearDown = () => {
migrationThread.join();
mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtab[dbName].access, accessState.kReject);
- assert(mtab[dbName].commitOrAbortOpTime);
+ assert.eq(mtab[kDBPrefix].access, accessState.kReject);
+ assert(mtab[kDBPrefix].commitOrAbortOpTime);
- donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix});
let commitOplogEntry =
donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
assert.eq(donorDoc.state, "committed");
assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts);
- const donorRecipientMonitorPoolStats =
- donorPrimary.adminCommand({connPoolStats: 1}).replicaSets;
- assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0);
-
+ expectedNumRecipientSyncDataCmdSent += 2;
const recipientSyncDataMetrics =
recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData;
assert.eq(recipientSyncDataMetrics.failed, 0);
- assert.eq(recipientSyncDataMetrics.total, 2);
- tearDown();
+ assert.eq(recipientSyncDataMetrics.total, expectedNumRecipientSyncDataCmdSent);
+
+ testDonorForgetMigration(donorRst, recipientRst, migrationId, kDBPrefix);
})();
(() => {
- // Test the case where the migration aborts.
- setup();
- const dbName = kDBPrefix + "Abort";
+ jsTest.log("Test the case where the migration aborts");
+ const migrationId = UUID();
+
+ let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0});
let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts");
assert.commandFailedWithCode(donorPrimary.adminCommand({
donorStartMigration: 1,
- migrationId: UUID(),
+ migrationId: migrationId,
recipientConnectionString: kRecipientConnString,
- databasePrefix: dbName,
+ databasePrefix: kDBPrefix,
readPreference: {mode: "primary"}
}),
ErrorCodes.TenantMigrationAborted);
abortFp.off();
const mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtab[dbName].access, accessState.kAllow);
- assert(!mtab[dbName].commitOrAbortOpTime);
+ assert.eq(mtab[kDBPrefix].access, accessState.kAllow);
+ assert(!mtab[kDBPrefix].commitOrAbortOpTime);
- const donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ const donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix});
const abortOplogEntry =
donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
assert.eq(donorDoc.state, "aborted");
assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts);
- const donorRecipientMonitorPoolStats =
- donorPrimary.adminCommand({connPoolStats: 1}).replicaSets;
- assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0);
-
+ expectedNumRecipientSyncDataCmdSent += 2;
const recipientSyncDataMetrics =
recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData;
assert.eq(recipientSyncDataMetrics.failed, 0);
- assert.eq(recipientSyncDataMetrics.total, 2);
- tearDown();
+ assert.eq(recipientSyncDataMetrics.total, expectedNumRecipientSyncDataCmdSent);
+
+ testDonorForgetMigration(donorRst, recipientRst, migrationId, kDBPrefix);
})();
+
+donorRst.stopSet();
+recipientRst.stopSet();
})();
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 8c355271195..f788660569e 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -399,6 +399,8 @@ error_codes:
- {code: 326, name: OplogQueryMinTsMissing}
+ - {code: 327, name: NoSuchTenantMigration}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
index e8627694a63..5478df86f57 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
@@ -88,7 +88,7 @@ public:
TenantMigrationDonorService::Instance::getOrCreate(donorService, donorStateDoc);
uassertStatusOK(donor->checkIfOptionsConflict(donorStateDoc));
- donor->getCompletionFuture().get();
+ donor->getDecisionFuture().get();
}
void doCheckAuthorization(OperationContext* opCtx) const {}
@@ -168,6 +168,21 @@ public:
uassert(ErrorCodes::CommandNotSupported,
"donorForgetMigration command not enabled",
repl::enableTenantMigrations);
+
+ const RequestType& requestBody = request();
+
+ auto donorService =
+ repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(TenantMigrationDonorService::kServiceName);
+ auto donor = TenantMigrationDonorService::Instance::lookup(
+ donorService, BSON("_id" << requestBody.getMigrationId()));
+ uassert(ErrorCodes::NoSuchTenantMigration,
+ str::stream() << "Could not find tenant migration with id "
+ << requestBody.getMigrationId(),
+ donor);
+
+ donor.get().get()->onReceiveDonorForgetMigration();
+ donor.get().get()->getCompletionFuture().get();
}
private:
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
index 29f5be9b3f4..27b0c745073 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
@@ -70,4 +70,4 @@ commands:
fields:
migrationId:
description: "Unique identifier for the tenant migration."
- type: uuid \ No newline at end of file
+ type: uuid
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 85fe5eca421..c13f444e191 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -574,7 +574,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc);
} else if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace) {
- tenant_migration_donor::onDonorStateTransition(opCtx, args.updateArgs.updatedDoc);
+ tenant_migration_donor::onDonorStateDocUpdate(opCtx, args.updateArgs.updatedDoc);
}
}
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 63a2626d1b3..d1fd2714f33 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1281,6 +1281,7 @@ env.Library(
],
LIBDEPS=[
'primary_only_service',
+ 'repl_server_parameters',
'tenant_migration_donor',
'wait_for_majority_service',
],
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 562d80ad126..4153ad4ef0e 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -330,4 +330,15 @@ server_parameters:
set_at: startup
cpp_vartype: bool
cpp_varname: enableTenantMigrations
- default: false \ No newline at end of file
+ default: false
+
+ tenantMigrationGarbageCollectionDelayMS:
+ description: >-
+ The amount of time in milliseconds that the donor or recipient should wait before
+ removing the migration state document after receiving donorForgetMigration or
+ recipientForgetMigration.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: tenantMigrationGarbageCollectionDelayMS
+ default:
+ expr: 48 * 60 * 60 * 1000
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 8f3fb868e7a..d821c6c3f26 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/tenant_migration_access_blocker.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/repl/tenant_migration_donor_util.h"
@@ -173,6 +174,34 @@ repl::OpTime TenantMigrationDonorService::Instance::_updateStateDocument(
return updateOpTime.get();
}
+repl::OpTime TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable() {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ DBDirectClient dbClient(opCtx);
+
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
+
+ auto commandResponse = dbClient.runCommand([&] {
+ write_ops::Update updateOp(_stateDocumentsNS);
+ auto updateModification =
+ write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON());
+ write_ops::UpdateOpEntry updateEntry(
+ BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()),
+ updateModification);
+ updateEntry.setMulti(false);
+ updateEntry.setUpsert(false);
+ updateOp.setUpdates({updateEntry});
+
+ return updateOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+}
+
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime) {
return WaitForMajorityService::get(_serviceContext)
@@ -180,27 +209,11 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit
.thenRunOn(**executor);
}
-ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
+ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient(
+ OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter) {
- if (skipSendingRecipientSyncDataCommand.shouldFail()) {
- return ExecutorFuture<void>(**executor, Status::OK());
- }
-
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- BSONObj cmdObj = BSONObj([&]() {
- auto donorConnString =
- repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
- RecipientSyncData request(_stateDoc.getId(),
- donorConnString.toString(),
- _stateDoc.getDatabasePrefix().toString(),
- _stateDoc.getReadPreference());
- request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp());
- return request.toBSON(BSONObj());
- }());
-
+ RemoteCommandTargeter* recipientTargeter,
+ const BSONObj& cmdObj) {
HostAndPort recipientHost =
uassertStatusOK(recipientTargeter->findHost(opCtx, ReadPreferenceSetting()));
@@ -238,6 +251,42 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
});
}
+ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter) {
+ if (skipSendingRecipientSyncDataCommand.shouldFail()) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ BSONObj cmdObj = BSONObj([&]() {
+ auto donorConnString =
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString();
+ RecipientSyncData request(_stateDoc.getId(),
+ donorConnString.toString(),
+ _stateDoc.getDatabasePrefix().toString(),
+ _stateDoc.getReadPreference());
+ request.setReturnAfterReachingOpTime(_stateDoc.getBlockTimestamp());
+ return request.toBSON(BSONObj());
+ }());
+
+ return _sendCommandToRecipient(opCtx, executor, recipientTargeter, cmdObj);
+}
+
+ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter) {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ return _sendCommandToRecipient(opCtx,
+ executor,
+ recipientTargeter,
+ RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj()));
+}
+
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
auto recipientUri =
@@ -298,15 +347,6 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
// Wait for the migration to commit or abort.
return _mtab->onCompletion();
})
- .onError([this](Status status) {
- if (!status.isOK() && _abortReason) {
- status.addContext(str::stream()
- << "Tenant migration with id \"" << _stateDoc.getId()
- << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix()
- << "\" aborted due to " << _abortReason);
- }
- return status;
- })
.onCompletion([this](Status status) {
LOGV2(5006601,
"Tenant migration completed",
@@ -314,6 +354,35 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"dbPrefix"_attr = _stateDoc.getDatabasePrefix(),
"status"_attr = status,
"abortReason"_attr = _abortReason);
+
+ if (status.isOK()) {
+ _decisionPromise.emplaceValue();
+ } else {
+ if (_abortReason) {
+ status.addContext(str::stream()
+ << "Tenant migration with id \"" << _stateDoc.getId()
+ << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix()
+ << "\" aborted due to " << _abortReason);
+ }
+ _decisionPromise.setError(status);
+ }
+ })
+ .then([this, executor] {
+ // Wait for the donorForgetMigration command.
+ return _receivedDonorForgetMigrationPromise.getFuture();
+ })
+ .then([this, executor] {
+ const auto opTime = _markStateDocumentAsGarbageCollectable();
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ })
+ .then([this, executor, recipientTargeter] {
+ return _sendRecipientForgetMigrationCommand(executor, recipientTargeter.get());
+ })
+ .onCompletion([this, executor](Status status) {
+ LOGV2(4920400,
+ "Marked migration state as garbage collectable",
+ "migrationId"_attr = _stateDoc.getId(),
+ "expireAt"_attr = _stateDoc.getExpireAt());
return status;
})
.semi();
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 30fe3a41461..ddf178121e4 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -80,6 +80,17 @@ public:
*/
Status checkIfOptionsConflict(BSONObj options);
+ /**
+ * Returns a Future that will be resolved when the migration has committed or aborted.
+ */
+ SharedSemiFuture<void> getDecisionFuture() const {
+ return _decisionPromise.getFuture();
+ }
+
+ void onReceiveDonorForgetMigration() {
+ _receivedDonorForgetMigrationPromise.emplaceValue();
+ }
+
private:
const NamespaceString _stateDocumentsNS = NamespaceString::kTenantMigrationDonorsNamespace;
@@ -98,24 +109,51 @@ public:
repl::OpTime _updateStateDocument(const TenantMigrationDonorStateEnum nextState);
/**
+ * Sets the "expireAt" time for the state document to be garbage collected.
+ */
+ repl::OpTime _markStateDocumentAsGarbageCollectable();
+
+ /**
* Waits for given opTime to be majority committed.
*/
ExecutorFuture<void> _waitForMajorityWriteConcern(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime);
/**
+ * Sends the given command to the recipient replica set.
+ */
+ ExecutorFuture<void> _sendCommandToRecipient(
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter,
+ const BSONObj& cmdObj);
+
+ /**
* Sends the recipientSyncData command to the recipient replica set.
*/
ExecutorFuture<void> _sendRecipientSyncDataCommand(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
RemoteCommandTargeter* recipientTargeter);
+ /**
+ * Sends the recipientForgetMigration command to the recipient replica set.
+ */
+ ExecutorFuture<void> _sendRecipientForgetMigrationCommand(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ RemoteCommandTargeter* recipientTargeter);
+
ServiceContext* _serviceContext;
TenantMigrationDonorDocument _stateDoc;
std::shared_ptr<TenantMigrationAccessBlocker> _mtab;
boost::optional<Status> _abortReason;
+
+ // Promise that is resolved when the donor has majority-committed the migration decision.
+ SharedPromise<void> _decisionPromise;
+
+ // Promise that is resolved when the donor receives the donorForgetMigration command.
+ SharedPromise<void> _receivedDonorForgetMigrationPromise;
};
private:
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index 296e041658d..edae74ab4ff 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -58,7 +58,8 @@ const char kNetName[] = "TenantMigrationWorkerNetwork";
* Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking
* state.
*/
-void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) {
+void onTransitionToBlocking(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
invariant(donorStateDoc.getBlockTimestamp());
@@ -91,7 +92,8 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
/**
* Transitions the TenantMigrationAccessBlocker to the committed state.
*/
-void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) {
+void onTransitionToCommitted(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kCommitted);
invariant(donorStateDoc.getCommitOrAbortOpTime());
@@ -105,7 +107,8 @@ void onTransitionToCommitted(OperationContext* opCtx, TenantMigrationDonorDocume
/**
* Transitions the TenantMigrationAccessBlocker to the aborted state.
*/
-void onTransitionToAborted(OperationContext* opCtx, TenantMigrationDonorDocument& donorStateDoc) {
+void onTransitionToAborted(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted);
invariant(donorStateDoc.getCommitOrAbortOpTime());
@@ -130,24 +133,28 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(
executor::makeNetworkInterface(kNetName, nullptr, nullptr));
}
-void onDonorStateTransition(OperationContext* opCtx, const BSONObj& donorStateDoc) {
- auto parsedDonorStateDoc =
- TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), donorStateDoc);
-
- switch (parsedDonorStateDoc.getState()) {
- case TenantMigrationDonorStateEnum::kDataSync:
- break;
- case TenantMigrationDonorStateEnum::kBlocking:
- onTransitionToBlocking(opCtx, parsedDonorStateDoc);
- break;
- case TenantMigrationDonorStateEnum::kCommitted:
- onTransitionToCommitted(opCtx, parsedDonorStateDoc);
- break;
- case TenantMigrationDonorStateEnum::kAborted:
- onTransitionToAborted(opCtx, parsedDonorStateDoc);
- break;
- default:
- MONGO_UNREACHABLE;
+void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson) {
+ auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"),
+ donorStateDocBson);
+ if (donorStateDoc.getExpireAt()) {
+ TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext())
+ .remove(donorStateDoc.getDatabasePrefix());
+ } else {
+ switch (donorStateDoc.getState()) {
+ case TenantMigrationDonorStateEnum::kDataSync:
+ break;
+ case TenantMigrationDonorStateEnum::kBlocking:
+ onTransitionToBlocking(opCtx, donorStateDoc);
+ break;
+ case TenantMigrationDonorStateEnum::kCommitted:
+ onTransitionToCommitted(opCtx, donorStateDoc);
+ break;
+ case TenantMigrationDonorStateEnum::kAborted:
+ onTransitionToAborted(opCtx, donorStateDoc);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
}
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 5b1d9ef4168..4d3376fe170 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -49,10 +49,9 @@ namespace tenant_migration_donor {
std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(ServiceContext* serviceContext);
/**
- * Updates the TenantMigrationAccessBlocker for the tenant migration represented by the given
- * config.migrationDonors document.
+ * Updates the donor's in-memory migration state to reflect the given persisted state.
*/
-void onDonorStateTransition(OperationContext* opCtx, const BSONObj& doc);
+void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson);
/**
* If the operation has read concern "snapshot" or includes afterClusterTime, and the database is
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index 603c87e1bc7..e29354bac78 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -78,10 +78,10 @@ structs:
description:
"The opTime at which the donor's state document was set to 'committed' or 'aborted'."
optional: true
- garbageCollect:
- type: bool
- description: "A boolean that determines whether the state machine should be deleted after a delay via the TTL monitor."
- default: false
+ expireAt:
+ type: date
+ description: "The wall-clock time at which the state machine document should be removed by the TTL monitor."
+ optional: true
tenantMigrationRecipientDocument:
description: "Represents an in-progress tenant migration on the migration recipient."