summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-07-28 14:52:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-30 19:55:59 +0000
commit3012da79e0b640bf2429f2b83a59ef1ba60271cc (patch)
tree452e50910fe614f202ac38e156ee28cb280bae10
parent6aeb26d871314453df56d0c534af8d1994e9786c (diff)
downloadmongo-3012da79e0b640bf2429f2b83a59ef1ba60271cc.tar.gz
SERVER-49175 Make donorStartMigration write commitOrAbortOpTime on transitioning to commit or abort state
-rw-r--r--jstests/replsets/reads_during_tenant_migration.js266
-rw-r--r--jstests/replsets/tenant_migration_donor_state_machine.js109
-rw-r--r--jstests/replsets/writes_during_tenant_migration.js52
-rw-r--r--src/mongo/db/commands/tenant_migration_cmds.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.h2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp28
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h6
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.cpp161
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h6
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl5
10 files changed, 396 insertions, 241 deletions
diff --git a/jstests/replsets/reads_during_tenant_migration.js b/jstests/replsets/reads_during_tenant_migration.js
index b79e9f9f4b8..1e3c1e7e489 100644
--- a/jstests/replsets/reads_during_tenant_migration.js
+++ b/jstests/replsets/reads_during_tenant_migration.js
@@ -1,6 +1,6 @@
/**
- * Tests that causal reads are properly blocked or rejected if executed after the migration
- * transitions to the read blocking state.
+ * Test that that the donor blocks clusterTime reads that are executed while the migration is in
+ * the blocking state but does not block linearizable reads.
*
* @tags: [requires_fcv_46, requires_majority_read_concern]
*/
@@ -8,122 +8,162 @@
(function() {
'use strict';
-const rst = new ReplSetTest({nodes: 2});
-rst.startSet();
-rst.initiate();
-const primary = rst.getPrimary();
-
-const kDbPrefix = "testPrefix";
-const kDbName = kDbPrefix + "0";
-const kCollName = "testColl";
-const kMaxTimeMS = 3000;
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
+const kMaxTimeMS = 5 * 1000;
const kRecipientConnString = "testConnString";
-const kMigrationId = UUID();
-
-const kTenantMigrationsColl = 'tenantMigrationDonors';
-
-assert.commandWorked(primary.adminCommand({
- donorStartMigration: 1,
- migrationId: kMigrationId,
- recipientConnectionString: kRecipientConnString,
- databasePrefix: kDbPrefix,
- readPreference: {mode: "primary"}
-}));
-
-// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of the
-// oplog on the secondary to ensure that snapshot reads on the secondary will have read timestamp
-// >= blockTimestamp.
-rst.awaitLastOpCommitted();
-
-jsTest.log(
- "Test that the donorStartMigration command correctly sets the durable migration state to blocking");
-
-const donorDoc = primary.getDB("config")[kTenantMigrationsColl].findOne();
-const oplogEntry =
- primary.getDB("local").oplog.rs.findOne({ns: `config.${kTenantMigrationsColl}`, op: "u"});
-
-assert.eq(donorDoc._id, kMigrationId);
-assert.eq(donorDoc.databasePrefix, kDbPrefix);
-assert.eq(donorDoc.state, "blocking");
-assert.eq(donorDoc.blockTimestamp, oplogEntry.ts);
-
-jsTest.log("Test atClusterTime and afterClusterTime reads");
-
-rst.nodes.forEach((node) => {
- assert.commandWorked(node.getDB(kDbName).runCommand({find: kCollName}));
-
- // Test snapshot reads with and without atClusterTime.
- assert.commandFailedWithCode(node.getDB(kDbName).runCommand({
- find: kCollName,
- readConcern: {
- level: "snapshot",
- atClusterTime: donorDoc.blockTimestamp,
+const kConfigDonorsNS = "config.tenantMigrationDonors";
+
+function startMigration(host, dbName, recipientConnString) {
+ const primary = new Mongo(host);
+ return primary.adminCommand({
+ donorStartMigration: 1,
+ migrationId: UUID(),
+ recipientConnectionString: recipientConnString,
+ databasePrefix: dbName,
+ readPreference: {mode: "primary"}
+ });
+}
+
+function runCommand(db, cmd, expectedError) {
+ const res = db.runCommand(cmd);
+ if (expectedError) {
+ assert.commandFailedWithCode(res, expectedError);
+ } else {
+ assert.commandWorked(res);
+ }
+}
+
+/**
+ * Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >=
+ * blockingTimestamp but does not block linearizable reads.
+ */
+function testReadCommandWhenMigrationIsInBlocking(rst, testCase, dbName, collName) {
+ const primary = rst.getPrimary();
+
+ let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts");
+ let migrationThread = new Thread(startMigration, primary.host, dbName, kRecipientConnString);
+
+ // Wait for the migration to enter the blocking state.
+ migrationThread.start();
+ blockingFp.wait();
+
+ // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of
+ // the oplog on all the secondaries to ensure that snapshot reads on the secondaries with
+ // unspecified atClusterTime have read timestamp >= blockTimestamp.
+ rst.awaitLastOpCommitted();
+
+ const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ const command = testCase.requiresReadTimestamp
+ ? testCase.command(collName, donorDoc.blockTimestamp)
+ : testCase.command(collName);
+ command.maxTimeMS = kMaxTimeMS;
+
+ const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary];
+ nodes.forEach(node => {
+ const db = node.getDB(dbName);
+ runCommand(db, command, testCase.isLinearizableRead ? null : ErrorCodes.MaxTimeMSExpired);
+ });
+
+ blockingFp.off();
+ migrationThread.join();
+ assert.commandWorked(migrationThread.returnData());
+}
+
+const testCases = {
+ snapshotReadWithAtClusterTime: {
+ isSupportedOnSecondaries: true,
+ requiresReadTimestamp: true,
+ command: function(collName, readTimestamp) {
+ return {
+ find: collName,
+ readConcern: {
+ level: "snapshot",
+ atClusterTime: readTimestamp,
+ }
+ };
},
- maxTimeMS: kMaxTimeMS,
- }),
- ErrorCodes.MaxTimeMSExpired);
-
- assert.commandFailedWithCode(node.getDB(kDbName).runCommand({
- find: kCollName,
- readConcern: {
- level: "snapshot",
+ },
+ snapshotReadWithoutAtClusterTime: {
+ isSupportedOnSecondaries: true,
+ command: function(collName) {
+ return {
+ find: collName,
+ readConcern: {
+ level: "snapshot",
+ }
+ };
},
- maxTimeMS: kMaxTimeMS,
- }),
- ErrorCodes.MaxTimeMSExpired);
-
- // Test read with afterClusterTime.
- assert.commandFailedWithCode(node.getDB(kDbName).runCommand({
- find: kCollName,
- readConcern: {
- afterClusterTime: donorDoc.blockTimestamp,
+ },
+ snapshotReadWithAtClusterTimeInTxn: {
+ isSupportedOnSecondaries: false,
+ requiresReadTimestamp: true,
+ command: function(collName, readTimestamp) {
+ return {
+ find: collName,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false,
+ readConcern: {level: "snapshot", atClusterTime: readTimestamp}
+ };
+ }
+ },
+ snapshotReadWithoutAtClusterTimeInTxn: {
+ isSupportedOnSecondaries: false,
+ command: function(collName) {
+ return {
+ find: collName,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false,
+ readConcern: {level: "snapshot"}
+ };
+ }
+ },
+ readWithAfterClusterTime: {
+ isSupportedOnSecondaries: true,
+ requiresReadTimestamp: true,
+ command: function(collName, readTimestamp) {
+ return {
+ find: collName,
+ readConcern: {
+ afterClusterTime: readTimestamp,
+ }
+ };
},
- maxTimeMS: kMaxTimeMS,
- }),
- ErrorCodes.MaxTimeMSExpired);
-});
-
-// Test snapshot read with atClusterTime inside transaction.
-assert.commandFailedWithCode(primary.getDB(kDbName).runCommand({
- find: kCollName,
- lsid: {id: UUID()},
- txnNumber: NumberLong(0),
- startTransaction: true,
- autocommit: false,
- readConcern: {level: "snapshot", atClusterTime: donorDoc.blockTimestamp},
- maxTimeMS: kMaxTimeMS,
-}),
- ErrorCodes.MaxTimeMSExpired);
-
-// Test snapshot read without atClusterTime inside transaction.
-assert.commandFailedWithCode(primary.getDB(kDbName).runCommand({
- find: kCollName,
- lsid: {id: UUID()},
- txnNumber: NumberLong(0),
- startTransaction: true,
- autocommit: false,
- readConcern: {level: "snapshot"},
- maxTimeMS: kMaxTimeMS,
-}),
- ErrorCodes.MaxTimeMSExpired);
-
-jsTest.log("Test linearizable reads");
-
-// Test that linearizable reads are not blocked in the blocking state.
-assert.commandWorked(primary.getDB(kDbName).runCommand({
- find: kCollName,
- readConcern: {level: "linearizable"},
- maxTimeMS: kMaxTimeMS,
-}));
-
-// TODO (SERVER-49175): Uncomment this test case when committing is handled.
-// assert.commandFailedWithCode(primary.getDB(kDbName).runCommand({
-// find: kCollName,
-// readConcern: {level: "linearizable"},
-// maxTimeMS: kMaxTimeMS,
-// }),
-// ErrorCodes.TenantMigrationCommitted);
+ },
+ linearizableRead: {
+ isSupportedOnSecondaries: false,
+ isLinearizableRead: true,
+ command: function(collName) {
+ return {
+ find: collName,
+ readConcern: {level: "linearizable"},
+ };
+ }
+ }
+};
+
+const rst = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]});
+rst.startSet();
+rst.initiate();
+
+const kCollName = "testColl";
+
+// Run test cases.
+const testFuncs = {
+ inBlocking: testReadCommandWhenMigrationIsInBlocking,
+};
+
+for (const [testName, testFunc] of Object.entries(testFuncs)) {
+ for (const [commandName, testCase] of Object.entries(testCases)) {
+ let dbName = commandName + "-" + testName + "0";
+ testFunc(rst, testCase, dbName, kCollName);
+ }
+}
rst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js
index 7e0b90eec3f..df9ce0cfcf1 100644
--- a/jstests/replsets/tenant_migration_donor_state_machine.js
+++ b/jstests/replsets/tenant_migration_donor_state_machine.js
@@ -1,12 +1,16 @@
/**
- * Tests that after donorStartCommand is run, that reads and writes should be blocked for the
- * migrating tenant.
+ * Tests the TenantMigrationAccessBlocker and donor state document are updated correctly after
+ * the donorStartMigration command is run.
+ *
* @tags: [requires_fcv_46]
*/
(function() {
"use strict";
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
+
// An object that mirrors the access states for the TenantMigrationAccessBlocker.
const accessState = {
kAllow: 0,
@@ -15,42 +19,87 @@ const accessState = {
kReject: 3
};
-const donorRst = new ReplSetTest({nodes: 1});
+const donorRst =
+ new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]});
const recipientRst = new ReplSetTest({nodes: 1});
donorRst.startSet();
donorRst.initiate();
const donorPrimary = donorRst.getPrimary();
+const kRecipientConnString = recipientRst.getURL();
+const kDBPrefix = 'testDb';
+const kConfigDonorsNS = "config.tenantMigrationDonors";
-const kMigrationId = new UUID();
-const kRecipientConnectionString = recipientRst.getURL();
+(() => {
+ // Test the case where the migration commits.
+ const dbName = kDBPrefix + "Commit";
-const kReadPreference = {
- mode: "primary"
-};
-const kDBPrefix = 'databaseABC';
-
-jsTest.log('Running donorStartMigration command.');
-assert.commandWorked(donorPrimary.adminCommand({
- donorStartMigration: 1,
- migrationId: kMigrationId,
- recipientConnectionString: kRecipientConnectionString,
- databasePrefix: kDBPrefix,
- readPreference: kReadPreference
-}));
-
-jsTest.log('Running the serverStatus command.');
-const tenantMigrationServerStatus =
- donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
-
-// The donorStartMigration does the blocking write after updating the in-memory
-// access state to kBlockingWrites, and on completing the write the access state is
-// updated to kBlockingReadsAndWrites. Since the command doesn't return until the
-// write is completed, the state is always kBlockingReadsAndWrites after the
-// command returns.
-assert.eq(tenantMigrationServerStatus[kDBPrefix].access, accessState.kBlockingReadsAndWrites);
-assert(tenantMigrationServerStatus[kDBPrefix].blockTimestamp);
+ function startMigration(host, recipientConnString, dbName) {
+ const primary = new Mongo(host);
+ assert.commandWorked(primary.adminCommand({
+ donorStartMigration: 1,
+ migrationId: UUID(),
+ recipientConnectionString: recipientConnString,
+ databasePrefix: dbName,
+ readPreference: {mode: "primary"}
+ }));
+ }
+
+ let migrationThread =
+ new Thread(startMigration, donorPrimary.host, kRecipientConnString, dbName);
+ let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
+ migrationThread.start();
+
+ // Wait for the migration to enter the blocking state.
+ blockingFp.wait();
+
+ let mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
+ assert.eq(mtab[dbName].access, accessState.kBlockingReadsAndWrites);
+ assert(mtab[dbName].blockTimestamp);
+
+ let donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ let blockOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne(
+ {ns: kConfigDonorsNS, op: "u", "o.databasePrefix": dbName});
+ assert.eq(donorDoc.state, "blocking");
+ assert.eq(donorDoc.blockTimestamp, blockOplogEntry.ts);
+
+ // Allow the migration to complete.
+ blockingFp.off();
+ migrationThread.join();
+
+ // TODO (SERVER-49176): test that mtab is updated correctly.
+
+ donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ let commitOplogEntry =
+ donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
+ assert.eq(donorDoc.state, "committed");
+ assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts);
+})();
+
+(() => {
+ // Test the case where the migration aborts.
+ const dbName = kDBPrefix + "Abort";
+
+ let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts");
+ assert.commandFailedWithCode(donorPrimary.adminCommand({
+ donorStartMigration: 1,
+ migrationId: UUID(),
+ recipientConnectionString: kRecipientConnString,
+ databasePrefix: dbName,
+ readPreference: {mode: "primary"}
+ }),
+ ErrorCodes.InternalError);
+ abortFp.off();
+
+ // TODO (SERVER-49176): test that mtab is updated correctly.
+
+ const donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName});
+ const abortOplogEntry =
+ donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
+ assert.eq(donorDoc.state, "aborted");
+ assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts);
+})();
donorRst.stopSet();
})();
diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js
index 6904de7aae5..f34b08394bc 100644
--- a/jstests/replsets/writes_during_tenant_migration.js
+++ b/jstests/replsets/writes_during_tenant_migration.js
@@ -7,6 +7,9 @@
(function() {
'use strict';
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
+
const kTestDoc = {
x: -1
};
@@ -26,14 +29,15 @@ const kMaxSize = 1024; // max size of capped collections.
const kTxnNumber = NumberLong(0);
const kRecipientConnString = "testConnString";
-function startMigration(primary, dbName) {
- assert.commandWorked(primary.adminCommand({
+function startMigration(host, dbName, recipientConnString) {
+ const primary = new Mongo(host);
+ return primary.adminCommand({
donorStartMigration: 1,
migrationId: UUID(),
- recipientConnectionString: kRecipientConnString,
+ recipientConnectionString: recipientConnString,
databasePrefix: dbName,
readPreference: {mode: "primary"}
- }));
+ });
}
function createCollectionAndInsertDocs(primaryDB, collName, isCapped, numDocs = kNumInitialDocs) {
@@ -127,6 +131,7 @@ function makeTestOptions(primary, testCase, dbName, collName, useTransaction, us
return {
primaryConn,
primaryDB,
+ primaryHost: useSession ? primaryConn.getClient().host : primaryConn.host,
runAgainstAdminDb: testCase.runAgainstAdminDb,
command,
dbName,
@@ -175,14 +180,35 @@ function runCommand(testOpts, expectedError) {
}
}
+/**
+ * Test that the write succeeds when there is no migration.
+ */
function testWriteCommandSucceeded(testCase, testOpts) {
runCommand(testOpts);
testCase.assertCommandSucceeded(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
-function testWriteCommandBlocked(testCase, testOpts) {
- startMigration(testOpts.primaryDB, testOpts.dbName);
+/**
+ * Tests that the donor rejects writes that are executed in the blocking state.
+ */
+function testWriteCommandWhenMigrationIsInBlocking(testCase, testOpts) {
+ let blockingFp =
+ configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ let migrationThread =
+ new Thread(startMigration, testOpts.primaryHost, testOpts.dbName, kRecipientConnString);
+
+ // Run the command after the migration enters the blocking state.
+ migrationThread.start();
+ blockingFp.wait();
+ // TODO (SERVER-49181): assert that the command fails with MaxTimeMSExpired after the donor
+ // starts blocking writes instead of throwing an error.
runCommand(testOpts, ErrorCodes.TenantMigrationConflict);
+
+ // Allow the migration to complete.
+ blockingFp.off();
+ migrationThread.join();
+ assert.commandWorked(migrationThread.returnData());
+
testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
@@ -661,7 +687,6 @@ rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
-const kDbPrefix = "testDb";
const kCollName = "testColl";
// Validate test cases for all commands.
@@ -671,24 +696,23 @@ for (let command of Object.keys(testCases)) {
// Run test cases.
const testFuncs = {
- noTenantMigrationActive: testWriteCommandSucceeded, // verify that the test cases are correct.
- tenantMigrationInBlocking: testWriteCommandBlocked,
+ noMigration: testWriteCommandSucceeded, // verify that the test cases are correct.
+ inBlocking: testWriteCommandWhenMigrationIsInBlocking,
};
for (const [testName, testFunc] of Object.entries(testFuncs)) {
- for (let command of Object.keys(testCases)) {
- let testCase = testCases[command];
- let baseDbName = kDbPrefix + "-" + testName + "-" + command;
+ for (const [commandName, testCase] of Object.entries(testCases)) {
+ let baseDbName = commandName + "-" + testName + "0";
if (testCase.skip) {
- print("Skipping " + command + ": " + testCase.skip);
+ print("Skipping " + commandName + ": " + testCase.skip);
continue;
}
runTest(primary, testCase, testFunc, baseDbName + "Basic", kCollName);
// TODO (SERVER-49844): Test transactional writes during migration.
- if (testCase.isSupportedInTransaction && testName == "noTenantMigrationActive") {
+ if (testCase.isSupportedInTransaction && testName == "noMigration") {
runTest(
primary, testCase, testFunc, baseDbName + "Txn", kCollName, {useTransaction: true});
}
diff --git a/src/mongo/db/commands/tenant_migration_cmds.cpp b/src/mongo/db/commands/tenant_migration_cmds.cpp
index 388e65db61d..c9004ab690f 100644
--- a/src/mongo/db/commands/tenant_migration_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_cmds.cpp
@@ -52,7 +52,7 @@ public:
requestBody.getDatabasePrefix().toString(),
TenantMigrationDonorStateEnum::kDataSync);
- tenant_migration::dataSync(opCtx, donorStateDoc);
+ tenant_migration::startMigration(opCtx, donorStateDoc);
}
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h
index 4e45976595e..4cd70117fcd 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.h
@@ -61,7 +61,7 @@ namespace mongo {
* }
*
* Writes call checkIfCanWriteOrThrow after being assigned an OpTime but before committing. The
- * method throws MigratingTenantConflict if writes are being blocked, which is caught in the loop.
+ * method throws TenantMigrationConflict if writes are being blocked, which is caught in the loop.
* The write then blocks until the migration either commits (in which case checkIfCanWriteOrBlock
* throws an error that causes the write to be rejected) or aborts (in which case
* checkIfCanWriteOrBlock returns successfully and the write is retried in the loop). This loop is
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp
index bc20d69dbe7..1d052e25bd1 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp
@@ -27,8 +27,6 @@
* it in the license file.
*/
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
-
#include "mongo/db/repl/tenant_migration_access_blocker_by_prefix.h"
#include "mongo/db/repl/tenant_migration_access_blocker.h"
@@ -45,10 +43,10 @@ void TenantMigrationAccessBlockerByPrefix::add(StringData dbPrefix,
std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
stdx::lock_guard<Latch> lg(_mutex);
- auto it = _migratingTenantAccessBlockers.find(dbPrefix);
- invariant(it == _migratingTenantAccessBlockers.end());
+ auto it = _tenantMigrationAccessBlockers.find(dbPrefix);
+ invariant(it == _tenantMigrationAccessBlockers.end());
- _migratingTenantAccessBlockers.emplace(dbPrefix, mtab);
+ _tenantMigrationAccessBlockers.emplace(dbPrefix, mtab);
}
@@ -58,10 +56,10 @@ void TenantMigrationAccessBlockerByPrefix::add(StringData dbPrefix,
void TenantMigrationAccessBlockerByPrefix::remove(StringData dbPrefix) {
stdx::lock_guard<Latch> lg(_mutex);
- auto it = _migratingTenantAccessBlockers.find(dbPrefix);
- invariant(it != _migratingTenantAccessBlockers.end());
+ auto it = _tenantMigrationAccessBlockers.find(dbPrefix);
+ invariant(it != _tenantMigrationAccessBlockers.end());
- _migratingTenantAccessBlockers.erase(it);
+ _tenantMigrationAccessBlockers.erase(it);
}
@@ -80,11 +78,11 @@ TenantMigrationAccessBlockerByPrefix::getTenantMigrationAccessBlocker(StringData
return dbName.startsWith(dbPrefix);
};
- auto it = std::find_if(_migratingTenantAccessBlockers.begin(),
- _migratingTenantAccessBlockers.end(),
+ auto it = std::find_if(_tenantMigrationAccessBlockers.begin(),
+ _tenantMigrationAccessBlockers.end(),
doesDBNameStartWithPrefix);
- if (it == _migratingTenantAccessBlockers.end()) {
+ if (it == _tenantMigrationAccessBlockers.end()) {
return nullptr;
} else {
return it->second;
@@ -92,7 +90,7 @@ TenantMigrationAccessBlockerByPrefix::getTenantMigrationAccessBlocker(StringData
}
/**
- * Iterates through each of the MigratingTenantAccessBlockers stored by the mapping
+ * Iterates through each of the TenantMigrationAccessBlockers stored by the mapping
* and appends the server status of each blocker to the BSONObjBuilder.
*/
void TenantMigrationAccessBlockerByPrefix::appendInfoForServerStatus(BSONObjBuilder* builder) {
@@ -105,9 +103,9 @@ void TenantMigrationAccessBlockerByPrefix::appendInfoForServerStatus(BSONObjBuil
builder->append(blocker.first, tenantBuilder.obj());
};
- std::for_each(_migratingTenantAccessBlockers.begin(),
- _migratingTenantAccessBlockers.end(),
+ std::for_each(_tenantMigrationAccessBlockers.begin(),
+ _tenantMigrationAccessBlockers.end(),
appendBlockerStatus);
}
-} // namespace mongo \ No newline at end of file
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h
index 861da8507c3..36b13d55ce2 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h
@@ -51,11 +51,11 @@ public:
StringData dbName);
private:
- using MigratingTenantAccessBlockersMap =
+ using TenantMigrationAccessBlockersMap =
StringMap<std::shared_ptr<TenantMigrationAccessBlocker>>;
Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationAccessBlockerByPrefix::_mutex");
- MigratingTenantAccessBlockersMap _migratingTenantAccessBlockers;
+ TenantMigrationAccessBlockersMap _tenantMigrationAccessBlockers;
};
-} // namespace mongo \ No newline at end of file
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp
index 0576660ac7e..3f667242833 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp
@@ -42,6 +42,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
@@ -49,6 +50,9 @@ namespace tenant_migration {
namespace {
+MONGO_FAIL_POINT_DEFINE(abortTenantMigrationAfterBlockingStarts);
+MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterBlockingStarts);
+
const char kThreadNamePrefix[] = "TenantMigrationWorker-";
const char kPoolName[] = "TenantMigrationWorkerThreadPool";
const char kNetName[] = "TenantMigrationWorkerNetwork";
@@ -72,6 +76,26 @@ std::shared_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(
}
/**
+ * Creates a TenantMigrationAccessBlocker, and makes it start blocking writes. Then adds it to
+ * the TenantMigrationAccessBlockerByPrefix.
+ */
+std::shared_ptr<TenantMigrationAccessBlocker> startBlockingWritesForTenant(
+ OperationContext* opCtx, const TenantMigrationDonorDocument& donorStateDoc) {
+ invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
+ auto serviceContext = opCtx->getServiceContext();
+
+ auto mtab = std::make_shared<TenantMigrationAccessBlocker>(
+ serviceContext, makeTenantMigrationExecutor(serviceContext).get());
+
+ mtab->startBlockingWrites();
+
+ auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(serviceContext);
+ mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab);
+
+ return mtab;
+}
+
+/**
* Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking
* state.
*/
@@ -104,33 +128,36 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
}
/**
- * Creates a TenantMigrationAccessBlocker, and makes it start blocking writes. Then adds it to
- * the TenantMigrationAccessBlockerByPrefix.
+ * Inserts the provided donor's state document to config.tenantMigrationDonors and waits for
+ * majority write concern.
*/
-void startBlockingWritesForTenant(OperationContext* opCtx,
- const TenantMigrationDonorDocument& donorStateDoc) {
- invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
- auto serviceContext = opCtx->getServiceContext();
-
- executor::TaskExecutor* mtabExecutor = makeTenantMigrationExecutor(serviceContext).get();
- auto mtab = std::make_shared<TenantMigrationAccessBlocker>(serviceContext, mtabExecutor);
-
- mtab->startBlockingWrites();
-
- auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(serviceContext);
- mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab);
+void insertDonorStateDocument(OperationContext* opCtx,
+ const TenantMigrationDonorDocument& donorStateDoc) {
+ PersistentTaskStore<TenantMigrationDonorDocument> store(
+ NamespaceString::kTenantMigrationDonorsNamespace);
+ try {
+ store.add(opCtx, donorStateDoc);
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
+ uasserted(
+ 4917300,
+ str::stream()
+ << "While attempting to persist the donor's state machine for tenant migration"
+ << ", found another document with the same migration id. Attempted migration: "
+ << donorStateDoc.toBSON());
+ }
}
/**
- * Updates the donor document to have state "blocking" and a blockingTimestamp.
- * Does the write by reserving an oplog slot beforehand and uses it as the blockingTimestamp.
+ * Updates the given donor's state document to have the given state. Then, persists the updated
+ * document by reserving an oplog slot beforehand and using it as the blockTimestamp or
+ * commitOrAbortTimestamp depending on the state.
*/
-void updateDonorStateDocumentToBlocking(OperationContext* opCtx,
- const TenantMigrationDonorDocument& originalDonorStateDoc) {
-
+void updateDonorStateDocument(OperationContext* opCtx,
+ TenantMigrationDonorDocument& donorStateDoc,
+ const TenantMigrationDonorStateEnum nextState) {
uassertStatusOK(writeConflictRetry(
opCtx,
- "doStartBlockingWrite",
+ "updateDonorStateDoc",
NamespaceString::kTenantMigrationDonorsNamespace.ns(),
[&]() -> Status {
AutoGetCollection autoCollection(
@@ -144,28 +171,32 @@ void updateDonorStateDocumentToBlocking(OperationContext* opCtx,
}
WriteUnitOfWork wuow(opCtx);
+ const auto originalDonorStateDoc = donorStateDoc.toBSON();
const auto originalRecordId = Helpers::findOne(
- opCtx, collection, originalDonorStateDoc.toBSON(), false /* requireIndex */);
- const auto originalSnapshot = Snapshotted<BSONObj>(
- opCtx->recoveryUnit()->getSnapshotId(), originalDonorStateDoc.toBSON());
+ opCtx, collection, originalDonorStateDoc, false /* requireIndex */);
+ const auto originalSnapshot =
+ Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalDonorStateDoc);
invariant(!originalRecordId.isNull());
- // Reserve an opTime for the write and use it as the blockTimestamp for the migration.
+ // Reserve an opTime for the write.
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
-
- // Creates the new donor state document with the updated state and block time.
- // Then uses the updated document as the criteria (so its available in the oplog) when
- // creating the update arguments.
- const BSONObj updatedDonorStateDoc([&]() {
- TenantMigrationDonorDocument updatedDoc = originalDonorStateDoc;
- updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
- updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
- return updatedDoc.toBSON();
- }());
+ donorStateDoc.setState(nextState);
+ switch (nextState) {
+ case TenantMigrationDonorStateEnum::kBlocking:
+ donorStateDoc.setBlockTimestamp(oplogSlot.getTimestamp());
+ break;
+ case TenantMigrationDonorStateEnum::kCommitted:
+ case TenantMigrationDonorStateEnum::kAborted:
+ donorStateDoc.setCommitOrAbortOpTime(oplogSlot);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ const auto updatedDonorStateDoc = donorStateDoc.toBSON();
CollectionUpdateArgs args;
- args.criteria = BSON("_id" << originalDonorStateDoc.getId());
+ args.criteria = BSON("_id" << donorStateDoc.getId());
args.oplogSlot = oplogSlot;
args.update = updatedDonorStateDoc;
@@ -181,38 +212,46 @@ void updateDonorStateDocumentToBlocking(OperationContext* opCtx,
}));
}
-/**
- * Writes the provided donor's state document to config.tenantMigrationDonors and waits for majority
- * write concern.
- */
-void persistDonorStateDocument(OperationContext* opCtx,
- const TenantMigrationDonorDocument& donorStateDoc) {
- PersistentTaskStore<TenantMigrationDonorDocument> store(
- NamespaceString::kTenantMigrationDonorsNamespace);
- try {
- store.add(opCtx, donorStateDoc);
- } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
- uasserted(
- 4917300,
- str::stream()
- << "While attempting to persist the donor's state machine for tenant migration"
- << ", found another document with the same migration id. Attempted migration: "
- << donorStateDoc.toBSON());
- }
-}
} // namespace
+void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc) {
+ invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
-void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDonorStateDoc) {
- invariant(originalDonorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
- persistDonorStateDocument(opCtx, originalDonorStateDoc);
+ auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
+ auto mtab = mtabByPrefix.getTenantMigrationAccessBlocker(donorStateDoc.getDatabasePrefix());
- // Send recipientSyncData.
+ if (mtab) {
+ // There is already an active migration for the given database prefix.
+ return;
+ }
- startBlockingWritesForTenant(opCtx, originalDonorStateDoc);
+ try {
+ // Enter "dataSync" state.
+ insertDonorStateDocument(opCtx, donorStateDoc);
+
+ // TODO: Send recipientSyncData and wait for success response (i.e. the recipient's view of
+ // the data has become consistent).
+
+ // Enter "blocking" state.
+ auto mtab = startBlockingWritesForTenant(opCtx, donorStateDoc);
+
+ updateDonorStateDocument(opCtx, donorStateDoc, TenantMigrationDonorStateEnum::kBlocking);
+
+ // TODO: Send recipientSyncData with returnAfterReachingTimestamp set to the blockTimestamp.
+
+ pauseTenantMigrationAfterBlockingStarts.pauseWhileSet(opCtx);
+
+ if (abortTenantMigrationAfterBlockingStarts.shouldFail()) {
+ uasserted(ErrorCodes::InternalError, "simulate a tenant migration error");
+ }
+ } catch (DBException&) {
+ // Enter "abort" state.
+ updateDonorStateDocument(opCtx, donorStateDoc, TenantMigrationDonorStateEnum::kAborted);
+ throw;
+ }
- // Update the on-disk state of the migration to "blocking" state.
- updateDonorStateDocumentToBlocking(opCtx, originalDonorStateDoc);
+ // Enter "commit" state.
+ updateDonorStateDocument(opCtx, donorStateDoc, TenantMigrationDonorStateEnum::kCommitted);
}
void onDonorStateTransition(OperationContext* opCtx, const BSONObj& donorStateDoc) {
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 1840c197117..d3902278cf5 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/commands/tenant_migration_cmds_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/executor/task_executor.h"
@@ -38,10 +39,9 @@ namespace mongo {
namespace tenant_migration {
/**
- * Sends recipientSyncData to the recipient until success and starts blocking writes and causal
- * reads.
+ * Starts a tenant migration as defined in the given donor's state document.
*/
-void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDonorStateDoc);
+void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc);
/**
* Updates the TenantMigrationAccessBlocker for the tenant migration represented by the given
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index 49c7c300443..be891e14930 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -70,6 +70,11 @@ structs:
"The timestamp at which writes and causal reads against the databases
being migrated should start blocking."
optional: true
+ commitOrAbortOpTime:
+ type: optime
+ description:
+ "The opTime at which the donor's state document was set to 'committed' or 'aborted'."
+ optional: true
garbageCollect:
type: bool
description: "A boolean that determines whether the state machine should be deleted after a delay via the TTL monitor."