diff options
author | Christopher Caplinger <christopher.caplinger@mongodb.com> | 2022-04-15 15:20:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-15 16:12:06 +0000 |
commit | 53acc5a13647baaa038b8ec54b70bdf1cc17a4f8 (patch) | |
tree | b879546b038736d73948a25a97689eb9621f0123 | |
parent | a22dcf11b387ed677a8e8e6854478e63dfdf6060 (diff) | |
download | mongo-53acc5a13647baaa038b8ec54b70bdf1cc17a4f8.tar.gz |
SERVER-65084: Ensure backup cursor is opened at time > startMigrationDonorTimestamp
6 files changed, 113 insertions, 45 deletions
diff --git a/jstests/replsets/tenant_migration_buildindex_shard_merge.js b/jstests/replsets/tenant_migration_buildindex_shard_merge.js index dc4285d10ac..1fd249dd9b5 100644 --- a/jstests/replsets/tenant_migration_buildindex_shard_merge.js +++ b/jstests/replsets/tenant_migration_buildindex_shard_merge.js @@ -28,13 +28,6 @@ load("jstests/replsets/libs/tenant_migration_util.js"); const tenantMigrationTest = new TenantMigrationTest( {name: jsTestName(), sharedOptions: {setParameter: {maxNumActiveUserIndexBuilds: 100}}}); -if (TenantMigrationUtil.isShardMergeEnabled(tenantMigrationTest.getDonorPrimary().getDB("admin"))) { - // TODO (SERVER-65084): Re-enable this test. - jsTestLog("Skip: Temporarily skipping test, see SERVER-65084."); - tenantMigrationTest.stop(); - return; -} - const donorPrimary = tenantMigrationTest.getDonorPrimary(); const kTenant1Id = "testTenantId1"; const kTenant2Id = "testTenantId2"; diff --git a/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js b/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js index 35923b68ec4..5bd6b8d93d7 100644 --- a/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js +++ b/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js @@ -21,21 +21,27 @@ load("jstests/libs/uuid_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); +const kTenantId = "testTenantId"; + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), quickGarbageCollection: true}); -const donorRst = tenantMigrationTest.getDonorRst(); -const donorPrimary = donorRst.getPrimary(); -const recipientRst = tenantMigrationTest.getRecipientRst(); -const recipientPrimary = recipientRst.getPrimary(); - -const kTenantId = "testTenantId"; +function cleanup(dbName) { + const donorPrimary = tenantMigrationTest.getDonorRst().getPrimary(); + const donorDB = donorPrimary.getDB(dbName); + assert.commandWorked(donorDB.dropDatabase()); +} (() => { jsTest.log("Test writes during and after a migration that commits"); - const tenantId = kTenantId + "Commit"; - tenantMigrationTest.insertDonorDB(`${tenantId}_test`, "test"); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = donorRst.getPrimary(); + const recipientPrimary = tenantMigrationTest.getRecipientRst().getPrimary(); + + const tenantId = `${kTenantId}Commit`; + const donorDB = `${tenantId}_test`; + tenantMigrationTest.insertDonorDB(donorDB, "test"); const ns = tenantId + "_testDb.testColl"; const tenantCollOnRecipient = recipientPrimary.getCollection(ns); @@ -91,14 +97,18 @@ const kTenantId = "testTenantId"; tenantMigrationTest.waitForMigrationGarbageCollection(migrationOpts.migrationIdString, migrationOpts.tenantId); + cleanup(donorDB); })(); (() => { jsTest.log("Test writes after a migration aborted before the recipient receives the " + "returnAfterReachingTimestamp"); - const tenantId = kTenantId + "AbortBeforeReturnAfterReachingTs"; - tenantMigrationTest.insertDonorDB(`${tenantId}_test`, "test"); + const recipientPrimary = tenantMigrationTest.getRecipientRst().getPrimary(); + + const tenantId = `${kTenantId}AbortBeforeReturnAfterReachingTs`; + const donorDB = `${tenantId}_test`; + tenantMigrationTest.insertDonorDB(donorDB, "test"); const ns = tenantId + "_testDb.testColl"; const tenantCollOnRecipient = recipientPrimary.getCollection(ns); @@ -128,14 +138,19 @@ const kTenantId = "testTenantId"; tenantMigrationTest.waitForMigrationGarbageCollection(migrationOpts.migrationIdString, migrationOpts.tenantId); + cleanup(donorDB); })(); (() => { jsTest.log("Test writes after the migration aborted after the recipient finished oplog" + " application"); - const tenantId = kTenantId + "AbortAfterReturnAfterReachingTs"; - tenantMigrationTest.insertDonorDB(`${tenantId}_test`, "test"); + const donorPrimary = tenantMigrationTest.getDonorRst().getPrimary(); + const recipientPrimary = tenantMigrationTest.getRecipientRst().getPrimary(); + + const tenantId = `${kTenantId}AbortAfterReturnAfterReachingTs`; + const donorDB = `${tenantId}_test`; + tenantMigrationTest.insertDonorDB(donorDB, "test"); const ns = tenantId + "_testDb.testColl"; const tenantCollOnRecipient = recipientPrimary.getCollection(ns); @@ -163,7 +178,7 @@ const kTenantId = "testTenantId"; tenantMigrationTest.waitForMigrationGarbageCollection(migrationOpts.migrationIdString, migrationOpts.tenantId); + cleanup(donorDB); })(); - tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js b/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js index 5f07d431147..0bb840da29f 100644 --- a/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js +++ b/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js @@ -9,13 +9,15 @@ * because the recipient will invalidate its in-memory understanding and refetch the on-disk * transaction state instead. * - * Note: this test is designed to emulate a back-and-forth migration from donor to recipient, - * recipient to donor, then donor to recipient again. + * Note: incompatible_with_shard_merge because (1) this test runs back-to-back migrations, and + * (2) because of the two-phase nature of the database drop between migrations, wt files will + * still be present on the recipient during the second migration, leading to errors. * * @tags: [ * incompatible_with_eft, * incompatible_with_macos, * incompatible_with_windows_tls, + * incompatible_with_shard_merge, * requires_majority_read_concern, * requires_persistence, * serverless, diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js b/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js index e37d579d742..57a39f4982c 100644 --- a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js +++ b/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js @@ -34,6 +34,7 @@ const tenantMigrationTest = new TenantMigrationTest({ }); const kTenantId = "testTenantId"; +const tenantDB = tenantMigrationTest.tenantDB(kTenantId, "database"); const donorRst = tenantMigrationTest.getDonorRst(); const donorPrimary = tenantMigrationTest.getDonorPrimary(); @@ -50,8 +51,8 @@ const cmd = { txnNumber: NumberLong(123), }; -assert.commandWorked(donorPrimary.getDB("database").runCommand(cmd)); -assert.eq(2, donorPrimary.getDB("database").collection.find().itcount()); +assert.commandWorked(donorPrimary.getDB(tenantDB).runCommand(cmd)); +assert.eq(2, donorPrimary.getDB(tenantDB).collection.find().itcount()); const migrationId = UUID(); const migrationOpts = { @@ -62,10 +63,10 @@ const migrationOpts = { jsTestLog(`Starting migration: ${tojson(migrationOpts)}`); TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); -const {ok, n} = assert.commandWorked(recipientPrimary.getDB("database").runCommand(cmd)); +const {ok, n} = assert.commandWorked(recipientPrimary.getDB(tenantDB).runCommand(cmd)); assert.eq(1, ok); assert.eq(2, n); -assert.eq(2, recipientPrimary.getDB("database").collection.find().itcount()); +assert.eq(2, recipientPrimary.getDB(tenantDB).collection.find().itcount()); tenantMigrationTest.stop(); })(); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index dbbd56c654f..c4b1bdc5be5 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -1168,7 +1168,8 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs } ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) { + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { pauseTenantMigrationAfterFetchingAndStoringKeys.pauseWhileSet(); { stdx::lock_guard<Latch> lg(_mutex); @@ -1180,17 +1181,66 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState( pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState.pauseWhileSet(); // Enter "dataSync" state. - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, token) - .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), token); + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, abortToken) + .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); }); } ExecutorFuture<void> +TenantMigrationDonorService::Instance::_waitUntilStartMigrationDonorTimestampIsCheckpointed( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { + + if (getProtocol() != MigrationProtocolEnum::kShardMerge) { + return ExecutorFuture(**executor); + } + + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + auto startMigrationDonorTimestamp = [&] { + stdx::lock_guard<Latch> lg(_mutex); + return *_stateDoc.getStartMigrationDonorTimestamp(); + }(); + + invariant(startMigrationDonorTimestamp <= repl::ReplicationCoordinator::get(opCtx) + ->getCurrentCommittedSnapshotOpTime() + .getTimestamp()); + + // For shard merge, we set startApplyingDonorOpTime timestamp on the recipient to the donor's + // backup cursor checkpoint timestamp, and startMigrationDonorTimestamp to the timestamp after + // aborting all index builds. As a result, startApplyingDonorOpTime timestamp can be < + // startMigrationDonorTimestamp, which means we can erroneously fetch and apply index build + // operations before startMigrationDonorTimestamp. Trigger a stable checkpoint to ensure that + // the recipient does not fetch and apply donor index build entries before + // startMigrationDonorTimestamp. + return AsyncTry([this, self = shared_from_this(), startMigrationDonorTimestamp] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); + if (storageEngine->getLastStableRecoveryTimestamp() < startMigrationDonorTimestamp) { + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable( + opCtx, + /*stableCheckpoint*/ true); + } + }) + .until([this, self = shared_from_this(), startMigrationDonorTimestamp](Status status) { + uassertStatusOK(status); + auto storageEngine = getGlobalServiceContext()->getStorageEngine(); + if (storageEngine->getLastStableRecoveryTimestamp() < startMigrationDonorTimestamp) { + return false; + } + return true; + }) + .withBackoffBetweenIterations(Backoff(Milliseconds(100), Milliseconds(100))) + .on(**executor, abortToken); +} + +ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnterBlockingState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& token) { + const CancellationToken& abortToken) { { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) { @@ -1198,21 +1248,24 @@ TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnt } } - return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, token) + return _waitUntilStartMigrationDonorTimestampIsCheckpointed(executor, abortToken) + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { + return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, abortToken); + }) .then([this, self = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx); }) - .then([this, self = shared_from_this(), executor, token] { + .then([this, self = shared_from_this(), executor, abortToken] { // Enter "blocking" state. LOGV2(6104907, "Updating its state doc to enter 'blocking' state.", "migrationId"_attr = _migrationUuid, "tenantId"_attr = _tenantId); - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, token) - .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), token); + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, abortToken) + .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); }); }); } @@ -1221,7 +1274,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAndEnterCommittedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& token) { + const CancellationToken& abortToken) { { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) { @@ -1233,7 +1286,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd // Source to cancel the timeout if the operation completed in time. CancellationSource cancelTimeoutSource; - CancellationSource recipientSyncDataSource(token); + CancellationSource recipientSyncDataSource(abortToken); auto deadlineReachedFuture = (*executor)->sleepFor(Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()), @@ -1286,15 +1339,15 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd uasserted(ErrorCodes::InternalError, "simulate a tenant migration error"); } }) - .then([this, self = shared_from_this(), executor, token] { + .then([this, self = shared_from_this(), executor, abortToken] { // Enter "commit" state. LOGV2(6104908, "Entering 'committed' state.", "migrationId"_attr = _migrationUuid, "tenantId"_attr = _tenantId); - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, token) - .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), token) + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, abortToken) + .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken) .then([this, self = shared_from_this()] { pauseTenantMigrationBeforeLeavingCommittedState.pauseWhileSet(); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 87931e03343..18321b11311 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -178,17 +178,21 @@ public: ExecutorFuture<void> _enterDataSyncState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& token); + const CancellationToken& abortToken); ExecutorFuture<void> _waitForRecipientToBecomeConsistentAndEnterBlockingState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& token); + const CancellationToken& abortToken); + + ExecutorFuture<void> _waitUntilStartMigrationDonorTimestampIsCheckpointed( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); ExecutorFuture<void> _waitForRecipientToReachBlockTimestampAndEnterCommittedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& token); + const CancellationToken& abortToken); ExecutorFuture<void> _handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, |