summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-09-10 22:06:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-17 18:42:37 +0000
commit96f566cef77449cf4dd304840fe770393106fc65 (patch)
treebb16ad0219c6ca38a214566f539d7be53c8d1047
parenta7714cfefeb609b30bbf2d9340418ce809d32146 (diff)
downloadmongo-96f566cef77449cf4dd304840fe770393106fc65.tar.gz
SERVER-50616 TenantMigrationDonor should retry its steps until success, the donor node is stepping down or shutting down, or the donor gets an error that should lead to an abort decision
-rw-r--r--jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js38
-rw-r--r--jstests/replsets/libs/tenant_migration_util.js36
-rw-r--r--jstests/replsets/tenant_migration_donor_failover_and_shutdown.js141
-rw-r--r--jstests/replsets/tenant_migration_donor_retry.js278
-rw-r--r--jstests/replsets/tenant_migration_donor_state_machine.js58
-rw-r--r--jstests/replsets/writes_during_tenant_migration.js14
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.cpp4
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp432
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h28
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/util/SConscript11
-rw-r--r--src/mongo/util/future_util.cpp59
-rw-r--r--src/mongo/util/future_util.h25
14 files changed, 847 insertions, 279 deletions
diff --git a/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js b/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js
index ee567a13290..9d79de3a0a3 100644
--- a/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js
+++ b/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js
@@ -9,20 +9,7 @@
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-
-/**
- * Starts a tenant migration on the given donor primary according the given migration options.
- */
-function startMigration(donorPrimaryHost, migrationOpts) {
- const donorPrimary = new Mongo(donorPrimaryHost);
- return donorPrimary.adminCommand({
- donorStartMigration: 1,
- migrationId: UUID(migrationOpts.migrationIdString),
- recipientConnectionString: migrationOpts.recipientConnString,
- databasePrefix: migrationOpts.dbPrefix,
- readPreference: migrationOpts.readPreference
- });
-}
+load("jstests/replsets/libs/tenant_migration_util.js");
/**
* Asserts that the number of recipientDataSync commands executed on the given recipient primary is
@@ -79,8 +66,8 @@ let numRecipientSyncDataCmdSent = 0;
readPreference: {mode: "primary"}
};
- assert.commandWorked(startMigration(rst0Primary.host, migrationOpts));
- assert.commandWorked(startMigration(rst0Primary.host, migrationOpts));
+ assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts));
+ assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts));
// If the second donorStartMigration had started a duplicate migration, the recipient would have
// received four recipientSyncData commands instead of two.
@@ -97,8 +84,10 @@ let numRecipientSyncDataCmdSent = 0;
readPreference: {mode: "primary"}
};
- let migrationThread0 = new Thread(startMigration, rst0Primary.host, migrationOpts);
- let migrationThread1 = new Thread(startMigration, rst0Primary.host, migrationOpts);
+ let migrationThread0 =
+ new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts);
+ let migrationThread1 =
+ new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts);
migrationThread0.start();
migrationThread1.start();
@@ -121,9 +110,10 @@ let numRecipientSyncDataCmdSent = 0;
*/
function testStartingConflictingMigrationAfterInitialMigrationCommitted(
donorPrimary, migrationOpts0, migrationOpts1) {
- assert.commandWorked(startMigration(donorPrimary.host, migrationOpts0));
- assert.commandFailedWithCode(startMigration(donorPrimary.host, migrationOpts1),
- ErrorCodes.ConflictingOperationInProgress);
+ assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts0));
+ assert.commandFailedWithCode(
+ TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts1),
+ ErrorCodes.ConflictingOperationInProgress);
// If the second donorStartMigration had started a duplicate migration, there would be two donor
// state docs.
@@ -136,8 +126,10 @@ function testStartingConflictingMigrationAfterInitialMigrationCommitted(
* migrations, only one of the migrations will start and succeed.
*/
function testConcurrentConflictingMigrations(donorPrimary, migrationOpts0, migrationOpts1) {
- let migrationThread0 = new Thread(startMigration, rst0Primary.host, migrationOpts0);
- let migrationThread1 = new Thread(startMigration, rst0Primary.host, migrationOpts1);
+ let migrationThread0 =
+ new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0);
+ let migrationThread1 =
+ new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts1);
migrationThread0.start();
migrationThread1.start();
diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js
new file mode 100644
index 00000000000..41750650d06
--- /dev/null
+++ b/jstests/replsets/libs/tenant_migration_util.js
@@ -0,0 +1,36 @@
+/**
+ * Utilities for testing tenant migrations.
+ */
+var TenantMigrationUtil = (function() {
+ // An object that mirrors the access states for the TenantMigrationAccessBlocker.
+ const accessState = {kAllow: 0, kBlockingWrites: 1, kBlockingReadsAndWrites: 2, kReject: 3};
+
+ function startMigration(donorPrimaryHost, migrationOpts) {
+ const donorPrimary = new Mongo(donorPrimaryHost);
+ return donorPrimary.adminCommand({
+ donorStartMigration: 1,
+ migrationId: UUID(migrationOpts.migrationIdString),
+ recipientConnectionString: migrationOpts.recipientConnString,
+ databasePrefix: migrationOpts.dbPrefix,
+ readPreference: migrationOpts.readPreference
+ });
+ }
+
+ function forgetMigration(donorPrimaryHost, migrationIdString) {
+ const migrationId = UUID(migrationIdString);
+ const donorPrimary = new Mongo(donorPrimaryHost);
+ while (true) {
+ let res =
+ donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId});
+ if (res.ok || res.code != ErrorCodes.NoSuchTenantMigration) {
+ return res;
+ }
+ }
+ }
+
+ return {
+ accessState,
+ startMigration,
+ forgetMigration,
+ };
+})(); \ No newline at end of file
diff --git a/jstests/replsets/tenant_migration_donor_failover_and_shutdown.js b/jstests/replsets/tenant_migration_donor_failover_and_shutdown.js
new file mode 100644
index 00000000000..a29ee85c96d
--- /dev/null
+++ b/jstests/replsets/tenant_migration_donor_failover_and_shutdown.js
@@ -0,0 +1,141 @@
+/**
+ * Tests that the migration is interrupted successfully on stepdown and shutdown.
+ *
+ * @tags: [requires_fcv_47, incompatible_with_eft]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+
+const kMaxSleepTimeMS = 1000;
+
+/**
+ * Runs the donorStartMigration command to start a migration, and interrupts the migration on the
+ * donor using the 'interruptFunc', and asserts that the command either succeeds or fails with an
+ * expected error.
+ */
+function testDonorStartMigrationInterrupt(interruptFunc, isExpectedErrorFunc) {
+ const donorRst = new ReplSetTest(
+ {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
+ const recipientRst = new ReplSetTest({
+ nodes: 1,
+ name: "recipientRst",
+ nodeOptions: {setParameter: {enableTenantMigrations: true}}
+ });
+
+ donorRst.startSet();
+ donorRst.initiate();
+
+ recipientRst.startSet();
+ recipientRst.initiate();
+
+ const donorPrimary = donorRst.getPrimary();
+
+ const migrationId = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: "testDbPrefix",
+ readPreference: {mode: "primary"},
+ };
+
+ let migrationThread =
+ new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+ migrationThread.start();
+ sleep(Math.random() * kMaxSleepTimeMS);
+ interruptFunc(donorRst);
+ migrationThread.join();
+
+ const res = migrationThread.returnData();
+ assert(res.ok || isExpectedErrorFunc(res.code), tojson(res));
+
+ donorRst.stopSet();
+ recipientRst.stopSet();
+}
+
+/**
+ * Starts a migration and waits for it to commit, then runs the donorForgetMigration, and interrupts
+ * the donor using the 'interruptFunc', and asserts that the command either succeeds or fails with
+ * an expected error.
+ */
+function testDonorForgetMigrationInterrupt(interruptFunc, isExpectedErrorFunc) {
+ const donorRst = new ReplSetTest(
+ {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
+ const recipientRst = new ReplSetTest({
+ nodes: 1,
+ name: "recipientRst",
+ nodeOptions: {setParameter: {enableTenantMigrations: true}}
+ });
+
+ donorRst.startSet();
+ donorRst.initiate();
+
+ recipientRst.startSet();
+ recipientRst.initiate();
+
+ const donorPrimary = donorRst.getPrimary();
+
+ const migrationId = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: "testDbPrefix",
+ readPreference: {mode: "primary"},
+ };
+
+ assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
+ let forgetMigrationThread = new Thread(
+ TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString);
+ forgetMigrationThread.start();
+ sleep(Math.random() * kMaxSleepTimeMS);
+ interruptFunc(donorRst);
+ forgetMigrationThread.join();
+
+ const res = forgetMigrationThread.returnData();
+ assert(res.ok || isExpectedErrorFunc(res.code), tojson(res));
+
+ donorRst.stopSet();
+ recipientRst.stopSet();
+}
+
+(() => {
+ jsTest.log("Test that the donorStartMigration command is interrupted successfully on stepdown");
+ testDonorStartMigrationInterrupt((donorRst) => {
+ assert.commandWorked(
+ donorRst.getPrimary().adminCommand({replSetStepDown: 1000, force: true}));
+ }, (errorCode) => ErrorCodes.isNotPrimaryError(errorCode));
+})();
+
+(() => {
+ jsTest.log("Test that the donorStartMigration command is interrupted successfully on shutdown");
+ testDonorStartMigrationInterrupt(
+ (donorRst) => {
+ donorRst.stopSet();
+ },
+ (errorCode) =>
+ ErrorCodes.isNotPrimaryError(errorCode) || ErrorCodes.isShutdownError(errorCode));
+})();
+
+(() => {
+ jsTest.log("Test that the donorForgetMigration is interrupted successfully on stepdown");
+ testDonorForgetMigrationInterrupt((donorRst) => {
+ assert.commandWorked(
+ donorRst.getPrimary().adminCommand({replSetStepDown: 1000, force: true}));
+ }, (errorCode) => ErrorCodes.isNotPrimaryError(errorCode));
+})();
+
+(() => {
+ jsTest.log("Test that the donorForgetMigration is interrupted successfully on shutdown");
+ testDonorForgetMigrationInterrupt(
+ (donorRst) => {
+ donorRst.stopSet();
+ },
+ (errorCode) =>
+ ErrorCodes.isNotPrimaryError(errorCode) || ErrorCodes.isShutdownError(errorCode));
+})();
+})();
diff --git a/jstests/replsets/tenant_migration_donor_retry.js b/jstests/replsets/tenant_migration_donor_retry.js
new file mode 100644
index 00000000000..effedf7495f
--- /dev/null
+++ b/jstests/replsets/tenant_migration_donor_retry.js
@@ -0,0 +1,278 @@
+/**
+ * Tests that the donor retries its steps until success, or it gets an error that should lead to
+ * an abort decision.
+ *
+ * @tags: [requires_fcv_47, incompatible_with_eft]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+
+const kDBPrefix = "testDb";
+let testNum = 0;
+const kConfigDonorsNS = "config.tenantMigrationDonors";
+
+function makeDbPrefix() {
+ return kDBPrefix + testNum++;
+}
+
+/**
+ * Starts a migration from 'donorRst' and 'recipientRst', uses failCommand to force the
+ * recipientSyncData command to fail with the given 'errorCode', and asserts the donor does not
+ * retry on that error and aborts the migration.
+ *
+ * TODO: This function should be changed to testDonorRetryRecipientSyncDataCmdOnError once there is
+ * a way to differentiate between local and remote stepdown/shutdown error.
+ */
+function testMigrationAbortsOnRecipientSyncDataCmdError(
+ donorRst, recipientRst, errorCode, failMode) {
+ const donorPrimary = donorRst.getPrimary();
+ const recipientPrimary = recipientRst.getPrimary();
+
+ const migrationId = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: kDBPrefix + makeDbPrefix(),
+ readPreference: {mode: "primary"},
+ };
+
+ let fp = configureFailPoint(recipientPrimary,
+ "failCommand",
+ {
+ failInternalCommands: true,
+ errorCode: errorCode,
+ failCommands: ["recipientSyncData"],
+ },
+ failMode);
+
+ let migrationThread =
+ new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+ migrationThread.start();
+
+ // Verify that the command failed.
+ const times = failMode.times ? failMode.times : 1;
+ for (let i = 0; i < times; i++) {
+ fp.wait();
+ }
+ fp.off();
+
+ migrationThread.join();
+ assert.commandFailedWithCode(migrationThread.returnData(), ErrorCodes.TenantMigrationAborted);
+
+ return migrationId;
+}
+
+/**
+ * Starts a migration from 'donorRst' and 'recipientRst', uses failCommand to force the
+ * recipientForgetMigration command to fail with the given 'errorCode', and asserts the donor does
+ * not retry on that error and aborts the migration.
+ *
+ * TODO: This function should be changed to testDonorRetryRecipientForgetMigrationCmdOnError once
+ * there is a way to differentiate between local and remote stepdown/shutdown error.
+ */
+function testMigrationAbortsOnRecipientForgetMigrationCmdError(donorRst, recipientRst, errorCode) {
+ const donorPrimary = donorRst.getPrimary();
+ const recipientPrimary = recipientRst.getPrimary();
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+
+ const migrationId = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: makeDbPrefix(),
+ readPreference: {mode: "primary"},
+ };
+
+ let fp = configureFailPoint(recipientPrimary,
+ "failCommand",
+ {
+ failInternalCommands: true,
+ errorCode: errorCode,
+ failCommands: ["recipientForgetMigration"],
+ },
+ {times: 1});
+
+ assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
+ let forgetMigrationThread = new Thread(
+ TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString);
+ forgetMigrationThread.start();
+
+ // Verify that the initial recipientForgetMigration command failed.
+ fp.wait();
+
+ forgetMigrationThread.join();
+ assert.commandFailedWithCode(forgetMigrationThread.returnData(), errorCode);
+ fp.off();
+
+ const donorStateDoc = configDonorsColl.findOne({_id: migrationId});
+ assert.eq("committed", donorStateDoc.state);
+ assert(donorStateDoc.expireAt);
+}
+
+const donorRst = new ReplSetTest(
+ {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
+const recipientRst = new ReplSetTest(
+ {nodes: 1, name: "recipientRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
+
+donorRst.startSet();
+donorRst.initiate();
+
+recipientRst.startSet();
+recipientRst.initiate();
+
+const donorPrimary = donorRst.getPrimary();
+
+(() => {
+ jsTest.log(
+ "Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" +
+ " on recipient stepdown errors");
+
+ const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
+ donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {times: 1});
+
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
+ assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+})();
+
+(() => {
+ jsTest.log(
+ "Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" +
+ " on recipient shutdown errors");
+
+ const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
+ donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {times: 1});
+
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
+ assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+})();
+
+(() => {
+ jsTest.log(
+ "Test that the donor does not retry recipientSyncData (with returnAfterReachingTimestamp) " +
+ "on stepdown errors");
+
+ const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
+ donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {skip: 1});
+
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
+ assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+})();
+
+(() => {
+ jsTest.log(
+ "Test that the donor does not retry recipientSyncData (with returnAfterReachingTimestamp) " +
+ "on recipient shutdown errors");
+
+ const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
+ donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {skip: 1});
+
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
+ assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+})();
+
+(() => {
+ jsTest.log("Test that the donor does not retry recipientForgetMigration on stepdown errors");
+ testMigrationAbortsOnRecipientForgetMigrationCmdError(
+ donorRst, recipientRst, ErrorCodes.NotWritablePrimary);
+})();
+
+(() => {
+ jsTest.log("Test that the donor does not retry recipientForgetMigration on shutdown errors");
+
+ testMigrationAbortsOnRecipientForgetMigrationCmdError(
+ donorRst, recipientRst, ErrorCodes.ShutdownInProgress);
+})();
+
+// Each donor state doc is updated three times throughout the lifetime of a tenant migration:
+// - Set the "state" to "blocking"
+// - Set the "state" to "commit"/"abort"
+// - Set the "expireAt" to make the doc garbage collectable by the TTL index.
+const kTotalStateDocUpdates = 3;
+const kWriteErrorTimeMS = 50;
+
+(() => {
+ jsTest.log("Test that the donor retries state doc insert on retriable errors");
+
+ const migrationId = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: makeDbPrefix(),
+ readPreference: {mode: "primary"},
+ };
+
+ let fp = configureFailPoint(donorPrimary, "failCollectionInserts", {
+ collectionNS: kConfigDonorsNS,
+ });
+
+ let migrationThread =
+ new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+ migrationThread.start();
+
+ // Make the insert keep failing for some time.
+ fp.wait();
+ sleep(kWriteErrorTimeMS);
+ fp.off();
+
+ migrationThread.join();
+ assert.commandWorked(migrationThread.returnData());
+
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ assert.eq("committed", configDonorsColl.findOne({_id: migrationId}).state);
+})();
+
+(() => {
+ jsTest.log("Test that the donor retries state doc update on retriable errors");
+
+ const migrationId = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: kDBPrefix + "RetryOnStateDocUpdateError",
+ readPreference: {mode: "primary"},
+ };
+
+ // Use a random number of skips to fail a random update to config.tenantMigrationDonors.
+ let fp = configureFailPoint(donorPrimary,
+ "failCollectionUpdates",
+ {
+ collectionNS: kConfigDonorsNS,
+ },
+ {skip: Math.floor(Math.random() * kTotalStateDocUpdates)});
+
+ let startMigrationThread =
+ new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+ let forgetMigrationThread = new Thread(
+ TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString);
+ startMigrationThread.start();
+ forgetMigrationThread.start();
+
+ // Make the update keep failing for some time.
+ fp.wait();
+ sleep(kWriteErrorTimeMS);
+ fp.off();
+
+ startMigrationThread.join();
+ forgetMigrationThread.join();
+ assert.commandWorked(startMigrationThread.returnData());
+ assert.commandWorked(forgetMigrationThread.returnData());
+
+ const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ const donorStateDoc = configDonorsColl.findOne({_id: migrationId});
+ assert.eq("committed", donorStateDoc.state);
+ assert(donorStateDoc.expireAt);
+})();
+
+donorRst.stopSet();
+recipientRst.stopSet();
+})();
diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js
index 48787ac019a..0404c528f59 100644
--- a/jstests/replsets/tenant_migration_donor_state_machine.js
+++ b/jstests/replsets/tenant_migration_donor_state_machine.js
@@ -16,6 +16,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
let expectedNumRecipientSyncDataCmdSent = 0;
let expectedNumRecipientForgetMigrationCmdSent = 0;
@@ -51,14 +52,6 @@ function testDonorForgetMigration(donorRst, recipientRst, migrationId, dbPrefix)
assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0);
}
-// An object that mirrors the access states for the TenantMigrationAccessBlocker.
-const accessState = {
- kAllow: 0,
- kBlockingWrites: 1,
- kBlockingReadsAndWrites: 2,
- kReject: 3
-};
-
const donorRst = new ReplSetTest({
nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
name: "donor",
@@ -97,23 +90,15 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0});
(() => {
jsTest.log("Test the case where the migration commits");
const migrationId = UUID();
-
- function startMigration(host, recipientConnString, dbPrefix, migrationIdString) {
- const primary = new Mongo(host);
- assert.commandWorked(primary.adminCommand({
- donorStartMigration: 1,
- migrationId: UUID(migrationIdString),
- recipientConnectionString: recipientConnString,
- databasePrefix: dbPrefix,
- readPreference: {mode: "primary"}
- }));
- }
-
- let migrationThread = new Thread(startMigration,
- donorPrimary.host,
- kRecipientConnString,
- kDBPrefix,
- extractUUIDFromObject(migrationId));
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ recipientConnString: recipientRst.getURL(),
+ dbPrefix: kDBPrefix,
+ readPreference: {mode: "primary"},
+ };
+
+ let migrationThread =
+ new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
migrationThread.start();
@@ -121,7 +106,7 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0});
blockingFp.wait();
let mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtab[kDBPrefix].access, accessState.kBlockingReadsAndWrites);
+ assert.eq(mtab[kDBPrefix].access, TenantMigrationUtil.accessState.kBlockingReadsAndWrites);
assert(mtab[kDBPrefix].blockTimestamp);
let donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix});
@@ -134,16 +119,18 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0});
blockingFp.off();
migrationThread.join();
- mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtab[kDBPrefix].access, accessState.kReject);
- assert(mtab[kDBPrefix].commitOrAbortOpTime);
-
donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix});
let commitOplogEntry =
donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
assert.eq(donorDoc.state, "committed");
assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts);
+ assert.soon(() => {
+ mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
+ return mtab[kDBPrefix].access === TenantMigrationUtil.accessState.kReject;
+ });
+ assert(mtab[kDBPrefix].commitOrAbortOpTime);
+
expectedNumRecipientSyncDataCmdSent += 2;
const recipientSyncDataMetrics =
recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData;
@@ -168,16 +155,19 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0});
ErrorCodes.TenantMigrationAborted);
abortFp.off();
- const mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtab[kDBPrefix].access, accessState.kAllow);
- assert(!mtab[kDBPrefix].commitOrAbortOpTime);
-
const donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix});
const abortOplogEntry =
donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
assert.eq(donorDoc.state, "aborted");
assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts);
+ let mtab;
+ assert.soon(() => {
+ mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
+ return mtab[kDBPrefix].access === TenantMigrationUtil.accessState.kAllow;
+ });
+ assert(!mtab[kDBPrefix].commitOrAbortOpTime);
+
expectedNumRecipientSyncDataCmdSent += 2;
const recipientSyncDataMetrics =
recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData;
diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js
index 33f1c97b878..e56b12dbd05 100644
--- a/jstests/replsets/writes_during_tenant_migration.js
+++ b/jstests/replsets/writes_during_tenant_migration.js
@@ -14,6 +14,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
const donorRst = new ReplSetTest(
{nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}});
@@ -260,6 +261,15 @@ function testWriteIsAcceptedIfSentAfterMigrationHasAborted(testCase, testOpts) {
ErrorCodes.TenantMigrationAborted);
abortFp.off();
+ // Wait until the in-memory migration state is updated after the migration has majority
+ // committed the abort decision. Otherwise, the command below is expected to block and then get
+ // rejected.
+ assert.soon(() => {
+ const mtab =
+ testOpts.primaryDB.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
+ return mtab[dbPrefix].access === TenantMigrationUtil.accessState.kAllow;
+ });
+
runCommand(testOpts);
testCase.assertCommandSucceeded(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
@@ -327,7 +337,7 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te
* Tests that the donor blocks writes that are executed in the blocking state and rejects them after
* the migration aborts.
*/
-function testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts(testCase, testOpts) {
+function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, testOpts) {
let blockingFp =
configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts");
let abortFp = configureFailPoint(testOpts.primaryDB, "abortTenantMigrationAfterBlockingStarts");
@@ -844,7 +854,7 @@ const testFuncs = {
inAborted: testWriteIsAcceptedIfSentAfterMigrationHasAborted,
inBlocking: testWriteBlocksIfMigrationIsInBlocking,
inBlockingThenCommitted: testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits,
- inBlockingThenAborted: testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts
+ inBlockingThenAborted: testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts
};
for (const [testName, testFunc] of Object.entries(testFuncs)) {
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
index 5478df86f57..0cf8d415ac9 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
@@ -88,7 +88,7 @@ public:
TenantMigrationDonorService::Instance::getOrCreate(donorService, donorStateDoc);
uassertStatusOK(donor->checkIfOptionsConflict(donorStateDoc));
- donor->getDecisionFuture().get();
+ donor->getDecisionFuture().get(opCtx);
}
void doCheckAuthorization(OperationContext* opCtx) const {}
@@ -182,7 +182,7 @@ public:
donor);
donor.get().get()->onReceiveDonorForgetMigration();
- donor.get().get()->getCompletionFuture().get();
+ donor.get().get()->getCompletionFuture().get(opCtx);
}
private:
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 99b7db99e5f..c4d20c00f6a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1328,6 +1328,7 @@ env.Library(
'tenant_migration_donor_service.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/util/future_util',
'primary_only_service',
'repl_server_parameters',
'tenant_migration_donor',
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index a7bc9ad78b4..1fe5e546783 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/future_util.h"
namespace mongo {
@@ -62,6 +63,21 @@ std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlocker(
.getTenantMigrationAccessBlockerForDbPrefix(dbPrefix);
}
+bool shouldStopInsertingDonorStateDoc(Status status) {
+ return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress ||
+ ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status);
+}
+
+bool shouldStopUpdatingDonorStateDoc(Status status) {
+ return status.isOK() || ErrorCodes::isShutdownError(status) ||
+ ErrorCodes::isNotPrimaryError(status);
+}
+
+bool shouldStopSendingRecipientCommand(Status status) {
+ return status.isOK() || ErrorCodes::isShutdownError(status) ||
+ ErrorCodes::isNotPrimaryError(status);
+}
+
} // namespace
TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext,
@@ -113,170 +129,212 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) {
}
}
-repl::OpTime TenantMigrationDonorService::Instance::_insertStateDocument() {
- const auto stateDocBson = _stateDoc.toBSON();
-
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
- DBDirectClient dbClient(opCtx);
-
- const auto commandResponse = dbClient.runCommand([&] {
- write_ops::Insert insertOp(_stateDocumentsNS);
- insertOp.setDocuments({stateDocBson});
- return insertOp.serialize({});
- }());
- const auto commandReply = commandResponse->getCommandReply();
- uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
-
- return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDocument(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ return AsyncTry([this] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ DBDirectClient dbClient(opCtx);
+
+ auto commandResponse = dbClient.runCommand([&] {
+ write_ops::Update updateOp(_stateDocumentsNS);
+ auto updateModification =
+ write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON());
+ write_ops::UpdateOpEntry updateEntry(
+ BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()),
+ updateModification);
+ updateEntry.setMulti(false);
+ updateEntry.setUpsert(true);
+ updateOp.setUpdates({updateEntry});
+
+ return updateOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ })
+ .until([](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
+ })
+ .on(**executor);
}
-repl::OpTime TenantMigrationDonorService::Instance::_updateStateDocument(
+ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDocument(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
const TenantMigrationDonorStateEnum nextState) {
- boost::optional<repl::OpTime> updateOpTime;
-
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- uassertStatusOK(
- writeConflictRetry(opCtx, "updateStateDoc", _stateDocumentsNS.ns(), [&]() -> Status {
- AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
-
- if (!collection) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream() << _stateDocumentsNS.ns() << " does not exist");
- }
-
- WriteUnitOfWork wuow(opCtx);
-
- const auto originalStateDocBson = _stateDoc.toBSON();
- const auto originalRecordId = Helpers::findOne(
- opCtx, collection.getCollection(), originalStateDocBson, false /* requireIndex */);
- const auto originalSnapshot =
- Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson);
- invariant(!originalRecordId.isNull());
-
- // Reserve an opTime for the write.
- auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
-
- // Update the state.
- _stateDoc.setState(nextState);
- switch (nextState) {
- case TenantMigrationDonorStateEnum::kBlocking:
- _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp());
- break;
- case TenantMigrationDonorStateEnum::kCommitted:
- case TenantMigrationDonorStateEnum::kAborted:
- _stateDoc.setCommitOrAbortOpTime(oplogSlot);
- break;
- default:
- MONGO_UNREACHABLE;
- }
- const auto updatedStateDocBson = _stateDoc.toBSON();
-
- CollectionUpdateArgs args;
- args.criteria = BSON("_id" << _stateDoc.getId());
- args.oplogSlot = oplogSlot;
- args.update = updatedStateDocBson;
-
- collection->updateDocument(opCtx,
- originalRecordId,
- originalSnapshot,
- updatedStateDocBson,
- false,
- nullptr /* OpDebug* */,
- &args);
-
- wuow.commit();
-
- updateOpTime = oplogSlot;
- return Status::OK();
- }));
-
- invariant(updateOpTime);
- return updateOpTime.get();
+ const auto originalStateDocBson = _stateDoc.toBSON();
+
+ return AsyncTry([this, executor, nextState, originalStateDocBson] {
+ boost::optional<repl::OpTime> updateOpTime;
+
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ uassertStatusOK(writeConflictRetry(
+ opCtx, "updateStateDoc", _stateDocumentsNS.ns(), [&]() -> Status {
+ AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream()
+ << _stateDocumentsNS.ns() << " does not exist");
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+
+ const auto originalRecordId = Helpers::findOne(opCtx,
+ collection.getCollection(),
+ originalStateDocBson,
+ false /* requireIndex */);
+ const auto originalSnapshot = Snapshotted<BSONObj>(
+ opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson);
+ invariant(!originalRecordId.isNull());
+
+ // Reserve an opTime for the write.
+ auto oplogSlot =
+ repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
+
+ // Update the state.
+ _stateDoc.setState(nextState);
+ switch (nextState) {
+ case TenantMigrationDonorStateEnum::kBlocking:
+ _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp());
+ break;
+ case TenantMigrationDonorStateEnum::kCommitted:
+ case TenantMigrationDonorStateEnum::kAborted:
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ const auto updatedStateDocBson = _stateDoc.toBSON();
+
+ CollectionUpdateArgs args;
+ args.criteria = BSON("_id" << _stateDoc.getId());
+ args.oplogSlot = oplogSlot;
+ args.update = updatedStateDocBson;
+
+ collection->updateDocument(opCtx,
+ originalRecordId,
+ originalSnapshot,
+ updatedStateDocBson,
+ false,
+ nullptr /* OpDebug* */,
+ &args);
+
+ wuow.commit();
+
+ updateOpTime = oplogSlot;
+ return Status::OK();
+ }));
+
+ invariant(updateOpTime);
+ return updateOpTime.get();
+ })
+ .until([](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus());
+ })
+ .on(**executor);
}
-repl::OpTime TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable() {
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
- DBDirectClient dbClient(opCtx);
-
- _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
- Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
-
- auto commandResponse = dbClient.runCommand([&] {
- write_ops::Update updateOp(_stateDocumentsNS);
- auto updateModification =
- write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON());
- write_ops::UpdateOpEntry updateEntry(
- BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()),
- updateModification);
- updateEntry.setMulti(false);
- updateEntry.setUpsert(false);
- updateOp.setUpdates({updateEntry});
-
- return updateOp.serialize({});
- }());
-
- const auto commandReply = commandResponse->getCommandReply();
- uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
-
- return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ExecutorFuture<repl::OpTime>
+TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ return AsyncTry([this] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ DBDirectClient dbClient(opCtx);
+
+ _stateDoc.setExpireAt(
+ _serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
+
+ auto commandResponse = dbClient.runCommand([&] {
+ write_ops::Update updateOp(_stateDocumentsNS);
+ auto updateModification =
+ write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON());
+ write_ops::UpdateOpEntry updateEntry(
+ BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()),
+ updateModification);
+ updateEntry.setMulti(false);
+ updateEntry.setUpsert(false);
+ updateOp.setUpdates({updateEntry});
+
+ return updateOp.serialize({});
+ }());
+
+ const auto commandReply = commandResponse->getCommandReply();
+ uassertStatusOK(getStatusFromWriteCommandReply(commandReply));
+
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ })
+ .until([](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus());
+ })
+ .on(**executor);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime) {
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) {
return WaitForMajorityService::get(_serviceContext)
.waitUntilMajority(std::move(opTime))
.thenRunOn(**executor);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient(
- OperationContext* opCtx,
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter,
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const BSONObj& cmdObj) {
- HostAndPort recipientHost =
- uassertStatusOK(recipientTargeter->findHost(opCtx, ReadPreferenceSetting()));
-
- executor::RemoteCommandRequest request(recipientHost,
- NamespaceString::kAdminDb.toString(),
- std::move(cmdObj),
- rpc::makeEmptyMetadata(),
- nullptr,
- kRecipientSyncDataTimeout);
-
- auto recipientSyncDataResponsePF =
- makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>();
- auto promisePtr = std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>(
- std::move(recipientSyncDataResponsePF.promise));
-
- auto scheduleResult =
- (**executor)->scheduleRemoteCommand(std::move(request), [promisePtr](const auto& args) {
- promisePtr->emplaceValue(args);
- });
-
- if (!scheduleResult.isOK()) {
- // Since the command failed to be scheduled, the callback above did not and will not run.
- // Thus, it is safe to fulfill the promise here without worrying about synchronizing access
- // with the executor's thread.
- promisePtr->setError(scheduleResult.getStatus());
- }
-
- return std::move(recipientSyncDataResponsePF.future)
- .thenRunOn(**executor)
- .then([this](auto args) -> Status {
- if (!args.response.status.isOK()) {
- return args.response.status;
- }
- return getStatusFromCommandResult(args.response.data);
- });
+ return AsyncTry([this, executor, recipientTargeterRS, cmdObj] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ HostAndPort recipientHost =
+ uassertStatusOK(recipientTargeterRS->findHost(opCtx, ReadPreferenceSetting()));
+
+ executor::RemoteCommandRequest request(recipientHost,
+ NamespaceString::kAdminDb.toString(),
+ std::move(cmdObj),
+ rpc::makeEmptyMetadata(),
+ nullptr,
+ kRecipientSyncDataTimeout);
+
+ auto recipientSyncDataResponsePF =
+ makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>();
+ auto promisePtr =
+ std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>(
+ std::move(recipientSyncDataResponsePF.promise));
+
+ auto scheduleResult =
+ (**executor)
+ ->scheduleRemoteCommand(std::move(request), [promisePtr](const auto& args) {
+ promisePtr->emplaceValue(args);
+ });
+
+ if (!scheduleResult.isOK()) {
+ // Since the command failed to be scheduled, the callback above did not and will
+ // not run. Thus, it is safe to fulfill the promise here without worrying about
+ // synchronizing access with the executor's thread.
+ promisePtr->setError(scheduleResult.getStatus());
+ }
+
+ return std::move(recipientSyncDataResponsePF.future)
+ .thenRunOn(**executor)
+ .then([this](auto args) -> Status {
+ if (!args.response.status.isOK()) {
+ return args.response.status;
+ }
+ return getStatusFromCommandResult(args.response.data);
+ });
+ })
+ .until([](Status status) { return shouldStopSendingRecipientCommand(status); })
+ .on(**executor);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter) {
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) {
if (skipSendingRecipientSyncDataCommand.shouldFail()) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -295,18 +353,14 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
return request.toBSON(BSONObj());
}());
- return _sendCommandToRecipient(opCtx, executor, recipientTargeter, cmdObj);
+ return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter) {
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- return _sendCommandToRecipient(opCtx,
- executor,
- recipientTargeter,
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) {
+ return _sendCommandToRecipient(executor,
+ recipientTargeterRS,
RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj()));
}
@@ -314,7 +368,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
auto recipientUri =
uassertStatusOK(MongoURI::parse(_stateDoc.getRecipientConnectionString().toString()));
- auto recipientTargeter = std::shared_ptr<RemoteCommandTargeterRS>(
+ auto recipientTargeterRS = std::shared_ptr<RemoteCommandTargeterRS>(
new RemoteCommandTargeterRS(recipientUri.getSetName(), recipientUri.getServers()),
[this, setName = recipientUri.getSetName()](RemoteCommandTargeterRS* p) {
ReplicaSetMonitor::remove(setName);
@@ -324,11 +378,12 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
return ExecutorFuture<void>(**executor)
.then([this, executor] {
// Enter "dataSync" state.
- const auto opTime = _insertStateDocument();
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _insertStateDocument(executor).then([this, executor](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ });
})
- .then([this, executor, recipientTargeter] {
- return _sendRecipientSyncDataCommand(executor, recipientTargeter.get());
+ .then([this, executor, recipientTargeterRS] {
+ return _sendRecipientSyncDataCommand(executor, recipientTargeterRS);
})
.then([this, executor] {
// Enter "blocking" state.
@@ -336,11 +391,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix());
invariant(mtab);
mtab->startBlockingWrites();
- const auto opTime = _updateStateDocument(TenantMigrationDonorStateEnum::kBlocking);
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kBlocking)
+ .then([this, executor](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ });
})
- .then([this, executor, recipientTargeter] {
- return _sendRecipientSyncDataCommand(executor, recipientTargeter.get());
+ .then([this, executor, recipientTargeterRS] {
+ return _sendRecipientSyncDataCommand(executor, recipientTargeterRS);
})
.then([this] {
auto opCtxHolder = cc().makeOperationContext();
@@ -360,24 +417,25 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
uasserted(ErrorCodes::InternalError, "simulate a tenant migration error");
});
})
- .then([this] {
+ .then([this, executor] {
// Enter "commit" state.
- _updateStateDocument(TenantMigrationDonorStateEnum::kCommitted);
- auto mtab =
- getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix());
- invariant(mtab);
- return mtab->onCompletion();
+ return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kCommitted)
+ .then([this, executor](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ });
})
.onError([this, executor](Status status) {
auto mtab =
getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix());
if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) {
- return SharedSemiFuture<void>(status);
+ return ExecutorFuture<void>(**executor, status);
} else {
// Enter "abort" state.
_abortReason.emplace(status);
- _updateStateDocument(TenantMigrationDonorStateEnum::kAborted);
- return mtab->onCompletion();
+ return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kAborted)
+ .then([this, executor](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ });
}
})
.onCompletion([this](Status status) {
@@ -396,14 +454,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
if (status.isOK()) {
- _decisionPromise.emplaceValue();
- } else {
+ // The migration commited or aborted successfully.
if (_abortReason) {
- status.addContext(str::stream()
- << "Tenant migration with id \"" << _stateDoc.getId()
- << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix()
- << "\" aborted due to " << _abortReason);
+ _decisionPromise.setError(
+ {ErrorCodes::TenantMigrationAborted,
+ str::stream() << "Tenant migration with id \"" << _stateDoc.getId()
+ << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix()
+ << "\" aborted due to " << _abortReason});
+ } else {
+ _decisionPromise.emplaceValue();
}
+ } else {
+ // There was a conflicting migration or this node is stepping down or shutting down.
_decisionPromise.setError(status);
}
})
@@ -412,13 +474,15 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
return _receiveDonorForgetMigrationPromise.getFuture();
})
.then([this, executor] {
- const auto opTime = _markStateDocumentAsGarbageCollectable();
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _markStateDocumentAsGarbageCollectable(executor).then(
+ [this, executor](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ });
})
- .then([this, executor, recipientTargeter] {
- return _sendRecipientForgetMigrationCommand(executor, recipientTargeter.get());
+ .then([this, executor, recipientTargeterRS] {
+ return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS);
})
- .onCompletion([this, executor](Status status) {
+ .onCompletion([this](Status status) {
LOGV2(4920400,
"Marked migration state as garbage collectable",
"migrationId"_attr = _stateDoc.getId(),
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 97ad6b3f4dc..af1409ab0db 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -100,7 +100,8 @@ public:
* Inserts the state document to _stateDocumentsNS and returns the opTime for the insert
* oplog entry.
*/
- repl::OpTime _insertStateDocument();
+ ExecutorFuture<repl::OpTime> _insertStateDocument(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor);
/**
* Updates the state document to have the given state. Then, persists the updated document
@@ -108,41 +109,44 @@ public:
* commitOrAbortTimestamp depending on the state. Returns the opTime for the update oplog
* entry.
*/
- repl::OpTime _updateStateDocument(const TenantMigrationDonorStateEnum nextState);
+ ExecutorFuture<repl::OpTime> _updateStateDocument(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const TenantMigrationDonorStateEnum nextState);
/**
- * Sets the "expireAt" time for the state document to be garbage collected.
+ * Sets the "expireAt" time for the state document to be garbage collected, and returns the
+ * the opTime for the write.
*/
- repl::OpTime _markStateDocumentAsGarbageCollectable();
+ ExecutorFuture<repl::OpTime> _markStateDocumentAsGarbageCollectable(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor);
/**
* Waits for given opTime to be majority committed.
*/
ExecutorFuture<void> _waitForMajorityWriteConcern(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime);
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime);
/**
* Sends the given command to the recipient replica set.
*/
ExecutorFuture<void> _sendCommandToRecipient(
- OperationContext* opCtx,
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter,
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const BSONObj& cmdObj);
/**
* Sends the recipientSyncData command to the recipient replica set.
*/
ExecutorFuture<void> _sendRecipientSyncDataCommand(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter);
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS);
/**
* Sends the recipientForgetMigration command to the recipient replica set.
*/
ExecutorFuture<void> _sendRecipientForgetMigrationCommand(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- RemoteCommandTargeter* recipientTargeter);
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS);
ServiceContext* _serviceContext;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 94331d29d1f..8b7b0e82f39 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -123,6 +123,7 @@ env.Library(
'$BUILD_DIR/mongo/db/rs_local_client',
'$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/idl/server_parameter',
+ '$BUILD_DIR/mongo/util/future_util',
'resharding_util',
],
)
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index a2df539fb88..fd6f45ef4ef 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -510,6 +510,16 @@ env.Benchmark(
],
)
+env.Library(
+ target='future_util',
+ source=[
+ 'future_util.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/executor/task_executor_interface'
+ ],
+)
+
env.Benchmark(
target='hash_table_bm',
source='hash_table_bm.cpp',
@@ -672,6 +682,7 @@ icuEnv.CppUnitTest(
'diagnostic_info' if get_option('use-diagnostic-latches') == 'on' else [],
'dns_query',
'fail_point',
+ 'future_util',
'icu',
'latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [],
'md5',
diff --git a/src/mongo/util/future_util.cpp b/src/mongo/util/future_util.cpp
new file mode 100644
index 00000000000..72517448cc8
--- /dev/null
+++ b/src/mongo/util/future_util.cpp
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/util/future_util.h"
+
+namespace mongo {
+
+ExecutorFuture<void> sleepUntil(std::shared_ptr<executor::TaskExecutor> executor,
+ const Date_t& date) {
+ auto [promise, future] = makePromiseFuture<void>();
+ auto taskCompletionPromise = std::make_shared<Promise<void>>(std::move(promise));
+
+ auto scheduledWorkHandle = executor->scheduleWorkAt(
+ date, [taskCompletionPromise](const executor::TaskExecutor::CallbackArgs& args) mutable {
+ if (args.status.isOK()) {
+ taskCompletionPromise->emplaceValue();
+ } else {
+ taskCompletionPromise->setError(args.status);
+ }
+ });
+
+ if (!scheduledWorkHandle.isOK()) {
+ taskCompletionPromise->setError(scheduledWorkHandle.getStatus());
+ }
+ return std::move(future).thenRunOn(executor);
+}
+
+ExecutorFuture<void> sleepFor(std::shared_ptr<executor::TaskExecutor> executor,
+ Milliseconds duration) {
+ return sleepUntil(executor, executor->now() + duration);
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index 6c1d71ac134..06f035f8ef6 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -28,6 +28,7 @@
*/
#pragma once
+#include "mongo/executor/task_executor.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -36,32 +37,12 @@ namespace mongo {
* Returns a future which will be fulfilled at the given date.
*/
ExecutorFuture<void> sleepUntil(std::shared_ptr<executor::TaskExecutor> executor,
- const Date_t& date) {
- auto [promise, future] = makePromiseFuture<void>();
- auto taskCompletionPromise = std::make_shared<Promise<void>>(std::move(promise));
-
- auto scheduledWorkHandle = executor->scheduleWorkAt(
- date, [taskCompletionPromise](const executor::TaskExecutor::CallbackArgs& args) mutable {
- if (args.status.isOK()) {
- taskCompletionPromise->emplaceValue();
- } else {
- taskCompletionPromise->setError(args.status);
- }
- });
-
- if (!scheduledWorkHandle.isOK()) {
- taskCompletionPromise->setError(scheduledWorkHandle.getStatus());
- }
- return std::move(future).thenRunOn(executor);
-}
-
+ const Date_t& date);
/**
* Returns a future which will be fulfilled after the given duration.
*/
ExecutorFuture<void> sleepFor(std::shared_ptr<executor::TaskExecutor> executor,
- Milliseconds duration) {
- return sleepUntil(executor, executor->now() + duration);
-}
+ Milliseconds duration);
namespace future_util_details {