diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2020-07-28 14:52:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-30 19:55:59 +0000 |
commit | 3012da79e0b640bf2429f2b83a59ef1ba60271cc (patch) | |
tree | 452e50910fe614f202ac38e156ee28cb280bae10 | |
parent | 6aeb26d871314453df56d0c534af8d1994e9786c (diff) | |
download | mongo-3012da79e0b640bf2429f2b83a59ef1ba60271cc.tar.gz |
SERVER-49175 Make donorStartMigration write commitOrAbortOpTime on transitioning to commit or abort state
10 files changed, 396 insertions, 241 deletions
diff --git a/jstests/replsets/reads_during_tenant_migration.js b/jstests/replsets/reads_during_tenant_migration.js index b79e9f9f4b8..1e3c1e7e489 100644 --- a/jstests/replsets/reads_during_tenant_migration.js +++ b/jstests/replsets/reads_during_tenant_migration.js @@ -1,6 +1,6 @@ /** - * Tests that causal reads are properly blocked or rejected if executed after the migration - * transitions to the read blocking state. + * Test that that the donor blocks clusterTime reads that are executed while the migration is in + * the blocking state but does not block linearizable reads. * * @tags: [requires_fcv_46, requires_majority_read_concern] */ @@ -8,122 +8,162 @@ (function() { 'use strict'; -const rst = new ReplSetTest({nodes: 2}); -rst.startSet(); -rst.initiate(); -const primary = rst.getPrimary(); - -const kDbPrefix = "testPrefix"; -const kDbName = kDbPrefix + "0"; -const kCollName = "testColl"; -const kMaxTimeMS = 3000; +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); +const kMaxTimeMS = 5 * 1000; const kRecipientConnString = "testConnString"; -const kMigrationId = UUID(); - -const kTenantMigrationsColl = 'tenantMigrationDonors'; - -assert.commandWorked(primary.adminCommand({ - donorStartMigration: 1, - migrationId: kMigrationId, - recipientConnectionString: kRecipientConnString, - databasePrefix: kDbPrefix, - readPreference: {mode: "primary"} -})); - -// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of the -// oplog on the secondary to ensure that snapshot reads on the secondary will have read timestamp -// >= blockTimestamp. -rst.awaitLastOpCommitted(); - -jsTest.log( - "Test that the donorStartMigration command correctly sets the durable migration state to blocking"); - -const donorDoc = primary.getDB("config")[kTenantMigrationsColl].findOne(); -const oplogEntry = - primary.getDB("local").oplog.rs.findOne({ns: `config.${kTenantMigrationsColl}`, op: "u"}); - -assert.eq(donorDoc._id, kMigrationId); -assert.eq(donorDoc.databasePrefix, kDbPrefix); -assert.eq(donorDoc.state, "blocking"); -assert.eq(donorDoc.blockTimestamp, oplogEntry.ts); - -jsTest.log("Test atClusterTime and afterClusterTime reads"); - -rst.nodes.forEach((node) => { - assert.commandWorked(node.getDB(kDbName).runCommand({find: kCollName})); - - // Test snapshot reads with and without atClusterTime. - assert.commandFailedWithCode(node.getDB(kDbName).runCommand({ - find: kCollName, - readConcern: { - level: "snapshot", - atClusterTime: donorDoc.blockTimestamp, +const kConfigDonorsNS = "config.tenantMigrationDonors"; + +function startMigration(host, dbName, recipientConnString) { + const primary = new Mongo(host); + return primary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: recipientConnString, + databasePrefix: dbName, + readPreference: {mode: "primary"} + }); +} + +function runCommand(db, cmd, expectedError) { + const res = db.runCommand(cmd); + if (expectedError) { + assert.commandFailedWithCode(res, expectedError); + } else { + assert.commandWorked(res); + } +} + +/** + * Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >= + * blockingTimestamp but does not block linearizable reads. + */ +function testReadCommandWhenMigrationIsInBlocking(rst, testCase, dbName, collName) { + const primary = rst.getPrimary(); + + let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts"); + let migrationThread = new Thread(startMigration, primary.host, dbName, kRecipientConnString); + + // Wait for the migration to enter the blocking state. + migrationThread.start(); + blockingFp.wait(); + + // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of + // the oplog on all the secondaries to ensure that snapshot reads on the secondaries with + // unspecified atClusterTime have read timestamp >= blockTimestamp. + rst.awaitLastOpCommitted(); + + const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + const command = testCase.requiresReadTimestamp + ? testCase.command(collName, donorDoc.blockTimestamp) + : testCase.command(collName); + command.maxTimeMS = kMaxTimeMS; + + const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary]; + nodes.forEach(node => { + const db = node.getDB(dbName); + runCommand(db, command, testCase.isLinearizableRead ? null : ErrorCodes.MaxTimeMSExpired); + }); + + blockingFp.off(); + migrationThread.join(); + assert.commandWorked(migrationThread.returnData()); +} + +const testCases = { + snapshotReadWithAtClusterTime: { + isSupportedOnSecondaries: true, + requiresReadTimestamp: true, + command: function(collName, readTimestamp) { + return { + find: collName, + readConcern: { + level: "snapshot", + atClusterTime: readTimestamp, + } + }; }, - maxTimeMS: kMaxTimeMS, - }), - ErrorCodes.MaxTimeMSExpired); - - assert.commandFailedWithCode(node.getDB(kDbName).runCommand({ - find: kCollName, - readConcern: { - level: "snapshot", + }, + snapshotReadWithoutAtClusterTime: { + isSupportedOnSecondaries: true, + command: function(collName) { + return { + find: collName, + readConcern: { + level: "snapshot", + } + }; }, - maxTimeMS: kMaxTimeMS, - }), - ErrorCodes.MaxTimeMSExpired); - - // Test read with afterClusterTime. - assert.commandFailedWithCode(node.getDB(kDbName).runCommand({ - find: kCollName, - readConcern: { - afterClusterTime: donorDoc.blockTimestamp, + }, + snapshotReadWithAtClusterTimeInTxn: { + isSupportedOnSecondaries: false, + requiresReadTimestamp: true, + command: function(collName, readTimestamp) { + return { + find: collName, + lsid: {id: UUID()}, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false, + readConcern: {level: "snapshot", atClusterTime: readTimestamp} + }; + } + }, + snapshotReadWithoutAtClusterTimeInTxn: { + isSupportedOnSecondaries: false, + command: function(collName) { + return { + find: collName, + lsid: {id: UUID()}, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false, + readConcern: {level: "snapshot"} + }; + } + }, + readWithAfterClusterTime: { + isSupportedOnSecondaries: true, + requiresReadTimestamp: true, + command: function(collName, readTimestamp) { + return { + find: collName, + readConcern: { + afterClusterTime: readTimestamp, + } + }; }, - maxTimeMS: kMaxTimeMS, - }), - ErrorCodes.MaxTimeMSExpired); -}); - -// Test snapshot read with atClusterTime inside transaction. -assert.commandFailedWithCode(primary.getDB(kDbName).runCommand({ - find: kCollName, - lsid: {id: UUID()}, - txnNumber: NumberLong(0), - startTransaction: true, - autocommit: false, - readConcern: {level: "snapshot", atClusterTime: donorDoc.blockTimestamp}, - maxTimeMS: kMaxTimeMS, -}), - ErrorCodes.MaxTimeMSExpired); - -// Test snapshot read without atClusterTime inside transaction. -assert.commandFailedWithCode(primary.getDB(kDbName).runCommand({ - find: kCollName, - lsid: {id: UUID()}, - txnNumber: NumberLong(0), - startTransaction: true, - autocommit: false, - readConcern: {level: "snapshot"}, - maxTimeMS: kMaxTimeMS, -}), - ErrorCodes.MaxTimeMSExpired); - -jsTest.log("Test linearizable reads"); - -// Test that linearizable reads are not blocked in the blocking state. -assert.commandWorked(primary.getDB(kDbName).runCommand({ - find: kCollName, - readConcern: {level: "linearizable"}, - maxTimeMS: kMaxTimeMS, -})); - -// TODO (SERVER-49175): Uncomment this test case when committing is handled. -// assert.commandFailedWithCode(primary.getDB(kDbName).runCommand({ -// find: kCollName, -// readConcern: {level: "linearizable"}, -// maxTimeMS: kMaxTimeMS, -// }), -// ErrorCodes.TenantMigrationCommitted); + }, + linearizableRead: { + isSupportedOnSecondaries: false, + isLinearizableRead: true, + command: function(collName) { + return { + find: collName, + readConcern: {level: "linearizable"}, + }; + } + } +}; + +const rst = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]}); +rst.startSet(); +rst.initiate(); + +const kCollName = "testColl"; + +// Run test cases. +const testFuncs = { + inBlocking: testReadCommandWhenMigrationIsInBlocking, +}; + +for (const [testName, testFunc] of Object.entries(testFuncs)) { + for (const [commandName, testCase] of Object.entries(testCases)) { + let dbName = commandName + "-" + testName + "0"; + testFunc(rst, testCase, dbName, kCollName); + } +} rst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js index 7e0b90eec3f..df9ce0cfcf1 100644 --- a/jstests/replsets/tenant_migration_donor_state_machine.js +++ b/jstests/replsets/tenant_migration_donor_state_machine.js @@ -1,12 +1,16 @@ /** - * Tests that after donorStartCommand is run, that reads and writes should be blocked for the - * migrating tenant. + * Tests the TenantMigrationAccessBlocker and donor state document are updated correctly after + * the donorStartMigration command is run. + * * @tags: [requires_fcv_46] */ (function() { "use strict"; +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); + // An object that mirrors the access states for the TenantMigrationAccessBlocker. const accessState = { kAllow: 0, @@ -15,42 +19,87 @@ const accessState = { kReject: 3 }; -const donorRst = new ReplSetTest({nodes: 1}); +const donorRst = + new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]}); const recipientRst = new ReplSetTest({nodes: 1}); donorRst.startSet(); donorRst.initiate(); const donorPrimary = donorRst.getPrimary(); +const kRecipientConnString = recipientRst.getURL(); +const kDBPrefix = 'testDb'; +const kConfigDonorsNS = "config.tenantMigrationDonors"; -const kMigrationId = new UUID(); -const kRecipientConnectionString = recipientRst.getURL(); +(() => { + // Test the case where the migration commits. + const dbName = kDBPrefix + "Commit"; -const kReadPreference = { - mode: "primary" -}; -const kDBPrefix = 'databaseABC'; - -jsTest.log('Running donorStartMigration command.'); -assert.commandWorked(donorPrimary.adminCommand({ - donorStartMigration: 1, - migrationId: kMigrationId, - recipientConnectionString: kRecipientConnectionString, - databasePrefix: kDBPrefix, - readPreference: kReadPreference -})); - -jsTest.log('Running the serverStatus command.'); -const tenantMigrationServerStatus = - donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - -// The donorStartMigration does the blocking write after updating the in-memory -// access state to kBlockingWrites, and on completing the write the access state is -// updated to kBlockingReadsAndWrites. Since the command doesn't return until the -// write is completed, the state is always kBlockingReadsAndWrites after the -// command returns. -assert.eq(tenantMigrationServerStatus[kDBPrefix].access, accessState.kBlockingReadsAndWrites); -assert(tenantMigrationServerStatus[kDBPrefix].blockTimestamp); + function startMigration(host, recipientConnString, dbName) { + const primary = new Mongo(host); + assert.commandWorked(primary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: recipientConnString, + databasePrefix: dbName, + readPreference: {mode: "primary"} + })); + } + + let migrationThread = + new Thread(startMigration, donorPrimary.host, kRecipientConnString, dbName); + let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); + migrationThread.start(); + + // Wait for the migration to enter the blocking state. + blockingFp.wait(); + + let mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; + assert.eq(mtab[dbName].access, accessState.kBlockingReadsAndWrites); + assert(mtab[dbName].blockTimestamp); + + let donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + let blockOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne( + {ns: kConfigDonorsNS, op: "u", "o.databasePrefix": dbName}); + assert.eq(donorDoc.state, "blocking"); + assert.eq(donorDoc.blockTimestamp, blockOplogEntry.ts); + + // Allow the migration to complete. + blockingFp.off(); + migrationThread.join(); + + // TODO (SERVER-49176): test that mtab is updated correctly. + + donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + let commitOplogEntry = + donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); + assert.eq(donorDoc.state, "committed"); + assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts); +})(); + +(() => { + // Test the case where the migration aborts. + const dbName = kDBPrefix + "Abort"; + + let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts"); + assert.commandFailedWithCode(donorPrimary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: kRecipientConnString, + databasePrefix: dbName, + readPreference: {mode: "primary"} + }), + ErrorCodes.InternalError); + abortFp.off(); + + // TODO (SERVER-49176): test that mtab is updated correctly. + + const donorDoc = donorPrimary.getCollection(kConfigDonorsNS).findOne({databasePrefix: dbName}); + const abortOplogEntry = + donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); + assert.eq(donorDoc.state, "aborted"); + assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts); +})(); donorRst.stopSet(); })(); diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js index 6904de7aae5..f34b08394bc 100644 --- a/jstests/replsets/writes_during_tenant_migration.js +++ b/jstests/replsets/writes_during_tenant_migration.js @@ -7,6 +7,9 @@ (function() { 'use strict'; +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); + const kTestDoc = { x: -1 }; @@ -26,14 +29,15 @@ const kMaxSize = 1024; // max size of capped collections. const kTxnNumber = NumberLong(0); const kRecipientConnString = "testConnString"; -function startMigration(primary, dbName) { - assert.commandWorked(primary.adminCommand({ +function startMigration(host, dbName, recipientConnString) { + const primary = new Mongo(host); + return primary.adminCommand({ donorStartMigration: 1, migrationId: UUID(), - recipientConnectionString: kRecipientConnString, + recipientConnectionString: recipientConnString, databasePrefix: dbName, readPreference: {mode: "primary"} - })); + }); } function createCollectionAndInsertDocs(primaryDB, collName, isCapped, numDocs = kNumInitialDocs) { @@ -127,6 +131,7 @@ function makeTestOptions(primary, testCase, dbName, collName, useTransaction, us return { primaryConn, primaryDB, + primaryHost: useSession ? primaryConn.getClient().host : primaryConn.host, runAgainstAdminDb: testCase.runAgainstAdminDb, command, dbName, @@ -175,14 +180,35 @@ function runCommand(testOpts, expectedError) { } } +/** + * Test that the write succeeds when there is no migration. + */ function testWriteCommandSucceeded(testCase, testOpts) { runCommand(testOpts); testCase.assertCommandSucceeded(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } -function testWriteCommandBlocked(testCase, testOpts) { - startMigration(testOpts.primaryDB, testOpts.dbName); +/** + * Tests that the donor rejects writes that are executed in the blocking state. + */ +function testWriteCommandWhenMigrationIsInBlocking(testCase, testOpts) { + let blockingFp = + configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + let migrationThread = + new Thread(startMigration, testOpts.primaryHost, testOpts.dbName, kRecipientConnString); + + // Run the command after the migration enters the blocking state. + migrationThread.start(); + blockingFp.wait(); + // TODO (SERVER-49181): assert that the command fails with MaxTimeMSExpired after the donor + // starts blocking writes instead of throwing an error. runCommand(testOpts, ErrorCodes.TenantMigrationConflict); + + // Allow the migration to complete. + blockingFp.off(); + migrationThread.join(); + assert.commandWorked(migrationThread.returnData()); + testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } @@ -661,7 +687,6 @@ rst.startSet(); rst.initiate(); const primary = rst.getPrimary(); -const kDbPrefix = "testDb"; const kCollName = "testColl"; // Validate test cases for all commands. @@ -671,24 +696,23 @@ for (let command of Object.keys(testCases)) { // Run test cases. const testFuncs = { - noTenantMigrationActive: testWriteCommandSucceeded, // verify that the test cases are correct. - tenantMigrationInBlocking: testWriteCommandBlocked, + noMigration: testWriteCommandSucceeded, // verify that the test cases are correct. + inBlocking: testWriteCommandWhenMigrationIsInBlocking, }; for (const [testName, testFunc] of Object.entries(testFuncs)) { - for (let command of Object.keys(testCases)) { - let testCase = testCases[command]; - let baseDbName = kDbPrefix + "-" + testName + "-" + command; + for (const [commandName, testCase] of Object.entries(testCases)) { + let baseDbName = commandName + "-" + testName + "0"; if (testCase.skip) { - print("Skipping " + command + ": " + testCase.skip); + print("Skipping " + commandName + ": " + testCase.skip); continue; } runTest(primary, testCase, testFunc, baseDbName + "Basic", kCollName); // TODO (SERVER-49844): Test transactional writes during migration. - if (testCase.isSupportedInTransaction && testName == "noTenantMigrationActive") { + if (testCase.isSupportedInTransaction && testName == "noMigration") { runTest( primary, testCase, testFunc, baseDbName + "Txn", kCollName, {useTransaction: true}); } diff --git a/src/mongo/db/commands/tenant_migration_cmds.cpp b/src/mongo/db/commands/tenant_migration_cmds.cpp index 388e65db61d..c9004ab690f 100644 --- a/src/mongo/db/commands/tenant_migration_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_cmds.cpp @@ -52,7 +52,7 @@ public: requestBody.getDatabasePrefix().toString(), TenantMigrationDonorStateEnum::kDataSync); - tenant_migration::dataSync(opCtx, donorStateDoc); + tenant_migration::startMigration(opCtx, donorStateDoc); } diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h index 4e45976595e..4cd70117fcd 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker.h @@ -61,7 +61,7 @@ namespace mongo { * } * * Writes call checkIfCanWriteOrThrow after being assigned an OpTime but before committing. The - * method throws MigratingTenantConflict if writes are being blocked, which is caught in the loop. + * method throws TenantMigrationConflict if writes are being blocked, which is caught in the loop. * The write then blocks until the migration either commits (in which case checkIfCanWriteOrBlock * throws an error that causes the write to be rejected) or aborts (in which case * checkIfCanWriteOrBlock returns successfully and the write is retried in the loop). This loop is diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp index bc20d69dbe7..1d052e25bd1 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication - #include "mongo/db/repl/tenant_migration_access_blocker_by_prefix.h" #include "mongo/db/repl/tenant_migration_access_blocker.h" @@ -45,10 +43,10 @@ void TenantMigrationAccessBlockerByPrefix::add(StringData dbPrefix, std::shared_ptr<TenantMigrationAccessBlocker> mtab) { stdx::lock_guard<Latch> lg(_mutex); - auto it = _migratingTenantAccessBlockers.find(dbPrefix); - invariant(it == _migratingTenantAccessBlockers.end()); + auto it = _tenantMigrationAccessBlockers.find(dbPrefix); + invariant(it == _tenantMigrationAccessBlockers.end()); - _migratingTenantAccessBlockers.emplace(dbPrefix, mtab); + _tenantMigrationAccessBlockers.emplace(dbPrefix, mtab); } @@ -58,10 +56,10 @@ void TenantMigrationAccessBlockerByPrefix::add(StringData dbPrefix, void TenantMigrationAccessBlockerByPrefix::remove(StringData dbPrefix) { stdx::lock_guard<Latch> lg(_mutex); - auto it = _migratingTenantAccessBlockers.find(dbPrefix); - invariant(it != _migratingTenantAccessBlockers.end()); + auto it = _tenantMigrationAccessBlockers.find(dbPrefix); + invariant(it != _tenantMigrationAccessBlockers.end()); - _migratingTenantAccessBlockers.erase(it); + _tenantMigrationAccessBlockers.erase(it); } @@ -80,11 +78,11 @@ TenantMigrationAccessBlockerByPrefix::getTenantMigrationAccessBlocker(StringData return dbName.startsWith(dbPrefix); }; - auto it = std::find_if(_migratingTenantAccessBlockers.begin(), - _migratingTenantAccessBlockers.end(), + auto it = std::find_if(_tenantMigrationAccessBlockers.begin(), + _tenantMigrationAccessBlockers.end(), doesDBNameStartWithPrefix); - if (it == _migratingTenantAccessBlockers.end()) { + if (it == _tenantMigrationAccessBlockers.end()) { return nullptr; } else { return it->second; @@ -92,7 +90,7 @@ TenantMigrationAccessBlockerByPrefix::getTenantMigrationAccessBlocker(StringData } /** - * Iterates through each of the MigratingTenantAccessBlockers stored by the mapping + * Iterates through each of the TenantMigrationAccessBlockers stored by the mapping * and appends the server status of each blocker to the BSONObjBuilder. */ void TenantMigrationAccessBlockerByPrefix::appendInfoForServerStatus(BSONObjBuilder* builder) { @@ -105,9 +103,9 @@ void TenantMigrationAccessBlockerByPrefix::appendInfoForServerStatus(BSONObjBuil builder->append(blocker.first, tenantBuilder.obj()); }; - std::for_each(_migratingTenantAccessBlockers.begin(), - _migratingTenantAccessBlockers.end(), + std::for_each(_tenantMigrationAccessBlockers.begin(), + _tenantMigrationAccessBlockers.end(), appendBlockerStatus); } -} // namespace mongo
\ No newline at end of file +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h index 861da8507c3..36b13d55ce2 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker_by_prefix.h @@ -51,11 +51,11 @@ public: StringData dbName); private: - using MigratingTenantAccessBlockersMap = + using TenantMigrationAccessBlockersMap = StringMap<std::shared_ptr<TenantMigrationAccessBlocker>>; Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationAccessBlockerByPrefix::_mutex"); - MigratingTenantAccessBlockersMap _migratingTenantAccessBlockers; + TenantMigrationAccessBlockersMap _tenantMigrationAccessBlockers; }; -} // namespace mongo
\ No newline at end of file +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index 0576660ac7e..3f667242833 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -42,6 +42,7 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point.h" namespace mongo { @@ -49,6 +50,9 @@ namespace tenant_migration { namespace { +MONGO_FAIL_POINT_DEFINE(abortTenantMigrationAfterBlockingStarts); +MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterBlockingStarts); + const char kThreadNamePrefix[] = "TenantMigrationWorker-"; const char kPoolName[] = "TenantMigrationWorkerThreadPool"; const char kNetName[] = "TenantMigrationWorkerNetwork"; @@ -72,6 +76,26 @@ std::shared_ptr<executor::TaskExecutor> makeTenantMigrationExecutor( } /** + * Creates a TenantMigrationAccessBlocker, and makes it start blocking writes. Then adds it to + * the TenantMigrationAccessBlockerByPrefix. + */ +std::shared_ptr<TenantMigrationAccessBlocker> startBlockingWritesForTenant( + OperationContext* opCtx, const TenantMigrationDonorDocument& donorStateDoc) { + invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync); + auto serviceContext = opCtx->getServiceContext(); + + auto mtab = std::make_shared<TenantMigrationAccessBlocker>( + serviceContext, makeTenantMigrationExecutor(serviceContext).get()); + + mtab->startBlockingWrites(); + + auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(serviceContext); + mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab); + + return mtab; +} + +/** * Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking * state. */ @@ -104,33 +128,36 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen } /** - * Creates a TenantMigrationAccessBlocker, and makes it start blocking writes. Then adds it to - * the TenantMigrationAccessBlockerByPrefix. + * Inserts the provided donor's state document to config.tenantMigrationDonors and waits for + * majority write concern. */ -void startBlockingWritesForTenant(OperationContext* opCtx, - const TenantMigrationDonorDocument& donorStateDoc) { - invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync); - auto serviceContext = opCtx->getServiceContext(); - - executor::TaskExecutor* mtabExecutor = makeTenantMigrationExecutor(serviceContext).get(); - auto mtab = std::make_shared<TenantMigrationAccessBlocker>(serviceContext, mtabExecutor); - - mtab->startBlockingWrites(); - - auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(serviceContext); - mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab); +void insertDonorStateDocument(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { + PersistentTaskStore<TenantMigrationDonorDocument> store( + NamespaceString::kTenantMigrationDonorsNamespace); + try { + store.add(opCtx, donorStateDoc); + } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) { + uasserted( + 4917300, + str::stream() + << "While attempting to persist the donor's state machine for tenant migration" + << ", found another document with the same migration id. Attempted migration: " + << donorStateDoc.toBSON()); + } } /** - * Updates the donor document to have state "blocking" and a blockingTimestamp. - * Does the write by reserving an oplog slot beforehand and uses it as the blockingTimestamp. + * Updates the given donor's state document to have the given state. Then, persists the updated + * document by reserving an oplog slot beforehand and using it as the blockTimestamp or + * commitOrAbortTimestamp depending on the state. */ -void updateDonorStateDocumentToBlocking(OperationContext* opCtx, - const TenantMigrationDonorDocument& originalDonorStateDoc) { - +void updateDonorStateDocument(OperationContext* opCtx, + TenantMigrationDonorDocument& donorStateDoc, + const TenantMigrationDonorStateEnum nextState) { uassertStatusOK(writeConflictRetry( opCtx, - "doStartBlockingWrite", + "updateDonorStateDoc", NamespaceString::kTenantMigrationDonorsNamespace.ns(), [&]() -> Status { AutoGetCollection autoCollection( @@ -144,28 +171,32 @@ void updateDonorStateDocumentToBlocking(OperationContext* opCtx, } WriteUnitOfWork wuow(opCtx); + const auto originalDonorStateDoc = donorStateDoc.toBSON(); const auto originalRecordId = Helpers::findOne( - opCtx, collection, originalDonorStateDoc.toBSON(), false /* requireIndex */); - const auto originalSnapshot = Snapshotted<BSONObj>( - opCtx->recoveryUnit()->getSnapshotId(), originalDonorStateDoc.toBSON()); + opCtx, collection, originalDonorStateDoc, false /* requireIndex */); + const auto originalSnapshot = + Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalDonorStateDoc); invariant(!originalRecordId.isNull()); - // Reserve an opTime for the write and use it as the blockTimestamp for the migration. + // Reserve an opTime for the write. auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - - // Creates the new donor state document with the updated state and block time. - // Then uses the updated document as the criteria (so its available in the oplog) when - // creating the update arguments. - const BSONObj updatedDonorStateDoc([&]() { - TenantMigrationDonorDocument updatedDoc = originalDonorStateDoc; - updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking); - updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp()); - return updatedDoc.toBSON(); - }()); + donorStateDoc.setState(nextState); + switch (nextState) { + case TenantMigrationDonorStateEnum::kBlocking: + donorStateDoc.setBlockTimestamp(oplogSlot.getTimestamp()); + break; + case TenantMigrationDonorStateEnum::kCommitted: + case TenantMigrationDonorStateEnum::kAborted: + donorStateDoc.setCommitOrAbortOpTime(oplogSlot); + break; + default: + MONGO_UNREACHABLE; + } + const auto updatedDonorStateDoc = donorStateDoc.toBSON(); CollectionUpdateArgs args; - args.criteria = BSON("_id" << originalDonorStateDoc.getId()); + args.criteria = BSON("_id" << donorStateDoc.getId()); args.oplogSlot = oplogSlot; args.update = updatedDonorStateDoc; @@ -181,38 +212,46 @@ void updateDonorStateDocumentToBlocking(OperationContext* opCtx, })); } -/** - * Writes the provided donor's state document to config.tenantMigrationDonors and waits for majority - * write concern. - */ -void persistDonorStateDocument(OperationContext* opCtx, - const TenantMigrationDonorDocument& donorStateDoc) { - PersistentTaskStore<TenantMigrationDonorDocument> store( - NamespaceString::kTenantMigrationDonorsNamespace); - try { - store.add(opCtx, donorStateDoc); - } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) { - uasserted( - 4917300, - str::stream() - << "While attempting to persist the donor's state machine for tenant migration" - << ", found another document with the same migration id. Attempted migration: " - << donorStateDoc.toBSON()); - } -} } // namespace +void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc) { + invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync); -void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDonorStateDoc) { - invariant(originalDonorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync); - persistDonorStateDocument(opCtx, originalDonorStateDoc); + auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); + auto mtab = mtabByPrefix.getTenantMigrationAccessBlocker(donorStateDoc.getDatabasePrefix()); - // Send recipientSyncData. + if (mtab) { + // There is already an active migration for the given database prefix. + return; + } - startBlockingWritesForTenant(opCtx, originalDonorStateDoc); + try { + // Enter "dataSync" state. + insertDonorStateDocument(opCtx, donorStateDoc); + + // TODO: Send recipientSyncData and wait for success response (i.e. the recipient's view of + // the data has become consistent). + + // Enter "blocking" state. + auto mtab = startBlockingWritesForTenant(opCtx, donorStateDoc); + + updateDonorStateDocument(opCtx, donorStateDoc, TenantMigrationDonorStateEnum::kBlocking); + + // TODO: Send recipientSyncData with returnAfterReachingTimestamp set to the blockTimestamp. + + pauseTenantMigrationAfterBlockingStarts.pauseWhileSet(opCtx); + + if (abortTenantMigrationAfterBlockingStarts.shouldFail()) { + uasserted(ErrorCodes::InternalError, "simulate a tenant migration error"); + } + } catch (DBException&) { + // Enter "abort" state. + updateDonorStateDocument(opCtx, donorStateDoc, TenantMigrationDonorStateEnum::kAborted); + throw; + } - // Update the on-disk state of the migration to "blocking" state. - updateDonorStateDocumentToBlocking(opCtx, originalDonorStateDoc); + // Enter "commit" state. + updateDonorStateDocument(opCtx, donorStateDoc, TenantMigrationDonorStateEnum::kCommitted); } void onDonorStateTransition(OperationContext* opCtx, const BSONObj& donorStateDoc) { diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 1840c197117..d3902278cf5 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/commands/tenant_migration_cmds_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/executor/task_executor.h" @@ -38,10 +39,9 @@ namespace mongo { namespace tenant_migration { /** - * Sends recipientSyncData to the recipient until success and starts blocking writes and causal - * reads. + * Starts a tenant migration as defined in the given donor's state document. */ -void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDonorStateDoc); +void startMigration(OperationContext* opCtx, TenantMigrationDonorDocument donorStateDoc); /** * Updates the TenantMigrationAccessBlocker for the tenant migration represented by the given diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 49c7c300443..be891e14930 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -70,6 +70,11 @@ structs: "The timestamp at which writes and causal reads against the databases being migrated should start blocking." optional: true + commitOrAbortOpTime: + type: optime + description: + "The opTime at which the donor's state document was set to 'committed' or 'aborted'." + optional: true garbageCollect: type: bool description: "A boolean that determines whether the state machine should be deleted after a delay via the TTL monitor." |