summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Caplinger <christopher.caplinger@mongodb.com>2022-04-15 15:20:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-15 16:12:06 +0000
commit53acc5a13647baaa038b8ec54b70bdf1cc17a4f8 (patch)
treeb879546b038736d73948a25a97689eb9621f0123
parenta22dcf11b387ed677a8e8e6854478e63dfdf6060 (diff)
downloadmongo-53acc5a13647baaa038b8ec54b70bdf1cc17a4f8.tar.gz
SERVER-65084: Ensure backup cursor is opened at time > startMigrationDonorTimestamp
-rw-r--r--jstests/replsets/tenant_migration_buildindex_shard_merge.js7
-rw-r--r--jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js41
-rw-r--r--jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js6
-rw-r--r--jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js9
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp85
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h10
6 files changed, 113 insertions, 45 deletions
diff --git a/jstests/replsets/tenant_migration_buildindex_shard_merge.js b/jstests/replsets/tenant_migration_buildindex_shard_merge.js
index dc4285d10ac..1fd249dd9b5 100644
--- a/jstests/replsets/tenant_migration_buildindex_shard_merge.js
+++ b/jstests/replsets/tenant_migration_buildindex_shard_merge.js
@@ -28,13 +28,6 @@ load("jstests/replsets/libs/tenant_migration_util.js");
const tenantMigrationTest = new TenantMigrationTest(
{name: jsTestName(), sharedOptions: {setParameter: {maxNumActiveUserIndexBuilds: 100}}});
-if (TenantMigrationUtil.isShardMergeEnabled(tenantMigrationTest.getDonorPrimary().getDB("admin"))) {
- // TODO (SERVER-65084): Re-enable this test.
- jsTestLog("Skip: Temporarily skipping test, see SERVER-65084.");
- tenantMigrationTest.stop();
- return;
-}
-
const donorPrimary = tenantMigrationTest.getDonorPrimary();
const kTenant1Id = "testTenantId1";
const kTenant2Id = "testTenantId2";
diff --git a/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js b/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js
index 35923b68ec4..5bd6b8d93d7 100644
--- a/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js
+++ b/jstests/replsets/tenant_migration_concurrent_writes_on_recipient.js
@@ -21,21 +21,27 @@ load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
+const kTenantId = "testTenantId";
+
const tenantMigrationTest =
new TenantMigrationTest({name: jsTestName(), quickGarbageCollection: true});
-const donorRst = tenantMigrationTest.getDonorRst();
-const donorPrimary = donorRst.getPrimary();
-const recipientRst = tenantMigrationTest.getRecipientRst();
-const recipientPrimary = recipientRst.getPrimary();
-
-const kTenantId = "testTenantId";
+function cleanup(dbName) {
+ const donorPrimary = tenantMigrationTest.getDonorRst().getPrimary();
+ const donorDB = donorPrimary.getDB(dbName);
+ assert.commandWorked(donorDB.dropDatabase());
+}
(() => {
jsTest.log("Test writes during and after a migration that commits");
- const tenantId = kTenantId + "Commit";
- tenantMigrationTest.insertDonorDB(`${tenantId}_test`, "test");
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = donorRst.getPrimary();
+ const recipientPrimary = tenantMigrationTest.getRecipientRst().getPrimary();
+
+ const tenantId = `${kTenantId}Commit`;
+ const donorDB = `${tenantId}_test`;
+ tenantMigrationTest.insertDonorDB(donorDB, "test");
const ns = tenantId + "_testDb.testColl";
const tenantCollOnRecipient = recipientPrimary.getCollection(ns);
@@ -91,14 +97,18 @@ const kTenantId = "testTenantId";
tenantMigrationTest.waitForMigrationGarbageCollection(migrationOpts.migrationIdString,
migrationOpts.tenantId);
+ cleanup(donorDB);
})();
(() => {
jsTest.log("Test writes after a migration aborted before the recipient receives the " +
"returnAfterReachingTimestamp");
- const tenantId = kTenantId + "AbortBeforeReturnAfterReachingTs";
- tenantMigrationTest.insertDonorDB(`${tenantId}_test`, "test");
+ const recipientPrimary = tenantMigrationTest.getRecipientRst().getPrimary();
+
+ const tenantId = `${kTenantId}AbortBeforeReturnAfterReachingTs`;
+ const donorDB = `${tenantId}_test`;
+ tenantMigrationTest.insertDonorDB(donorDB, "test");
const ns = tenantId + "_testDb.testColl";
const tenantCollOnRecipient = recipientPrimary.getCollection(ns);
@@ -128,14 +138,19 @@ const kTenantId = "testTenantId";
tenantMigrationTest.waitForMigrationGarbageCollection(migrationOpts.migrationIdString,
migrationOpts.tenantId);
+ cleanup(donorDB);
})();
(() => {
jsTest.log("Test writes after the migration aborted after the recipient finished oplog" +
" application");
- const tenantId = kTenantId + "AbortAfterReturnAfterReachingTs";
- tenantMigrationTest.insertDonorDB(`${tenantId}_test`, "test");
+ const donorPrimary = tenantMigrationTest.getDonorRst().getPrimary();
+ const recipientPrimary = tenantMigrationTest.getRecipientRst().getPrimary();
+
+ const tenantId = `${kTenantId}AbortAfterReturnAfterReachingTs`;
+ const donorDB = `${tenantId}_test`;
+ tenantMigrationTest.insertDonorDB(donorDB, "test");
const ns = tenantId + "_testDb.testColl";
const tenantCollOnRecipient = recipientPrimary.getCollection(ns);
@@ -163,7 +178,7 @@ const kTenantId = "testTenantId";
tenantMigrationTest.waitForMigrationGarbageCollection(migrationOpts.migrationIdString,
migrationOpts.tenantId);
+ cleanup(donorDB);
})();
-
tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js b/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js
index 5f07d431147..0bb840da29f 100644
--- a/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js
+++ b/jstests/replsets/tenant_migration_recipient_invalidates_in_memory_txns.js
@@ -9,13 +9,15 @@
* because the recipient will invalidate its in-memory understanding and refetch the on-disk
* transaction state instead.
*
- * Note: this test is designed to emulate a back-and-forth migration from donor to recipient,
- * recipient to donor, then donor to recipient again.
+ * Note: incompatible_with_shard_merge because (1) this test runs back-to-back migrations, and
+ * (2) because of the two-phase nature of the database drop between migrations, wt files will
+ * still be present on the recipient during the second migration, leading to errors.
*
* @tags: [
* incompatible_with_eft,
* incompatible_with_macos,
* incompatible_with_windows_tls,
+ * incompatible_with_shard_merge,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js b/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js
index e37d579d742..57a39f4982c 100644
--- a/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js
+++ b/jstests/replsets/tenant_migration_shard_merge_recipient_fetches_retryable_writes.js
@@ -34,6 +34,7 @@ const tenantMigrationTest = new TenantMigrationTest({
});
const kTenantId = "testTenantId";
+const tenantDB = tenantMigrationTest.tenantDB(kTenantId, "database");
const donorRst = tenantMigrationTest.getDonorRst();
const donorPrimary = tenantMigrationTest.getDonorPrimary();
@@ -50,8 +51,8 @@ const cmd = {
txnNumber: NumberLong(123),
};
-assert.commandWorked(donorPrimary.getDB("database").runCommand(cmd));
-assert.eq(2, donorPrimary.getDB("database").collection.find().itcount());
+assert.commandWorked(donorPrimary.getDB(tenantDB).runCommand(cmd));
+assert.eq(2, donorPrimary.getDB(tenantDB).collection.find().itcount());
const migrationId = UUID();
const migrationOpts = {
@@ -62,10 +63,10 @@ const migrationOpts = {
jsTestLog(`Starting migration: ${tojson(migrationOpts)}`);
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
-const {ok, n} = assert.commandWorked(recipientPrimary.getDB("database").runCommand(cmd));
+const {ok, n} = assert.commandWorked(recipientPrimary.getDB(tenantDB).runCommand(cmd));
assert.eq(1, ok);
assert.eq(2, n);
-assert.eq(2, recipientPrimary.getDB("database").collection.find().itcount());
+assert.eq(2, recipientPrimary.getDB(tenantDB).collection.find().itcount());
tenantMigrationTest.stop();
})();
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index dbbd56c654f..c4b1bdc5be5 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -1168,7 +1168,8 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
pauseTenantMigrationAfterFetchingAndStoringKeys.pauseWhileSet();
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1180,17 +1181,66 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState(
pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState.pauseWhileSet();
// Enter "dataSync" state.
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, token)
- .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, abortToken)
+ .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
});
}
ExecutorFuture<void>
+TenantMigrationDonorService::Instance::_waitUntilStartMigrationDonorTimestampIsCheckpointed(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
+
+ if (getProtocol() != MigrationProtocolEnum::kShardMerge) {
+ return ExecutorFuture(**executor);
+ }
+
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ auto startMigrationDonorTimestamp = [&] {
+ stdx::lock_guard<Latch> lg(_mutex);
+ return *_stateDoc.getStartMigrationDonorTimestamp();
+ }();
+
+ invariant(startMigrationDonorTimestamp <= repl::ReplicationCoordinator::get(opCtx)
+ ->getCurrentCommittedSnapshotOpTime()
+ .getTimestamp());
+
+ // For shard merge, we set startApplyingDonorOpTime timestamp on the recipient to the donor's
+ // backup cursor checkpoint timestamp, and startMigrationDonorTimestamp to the timestamp after
+ // aborting all index builds. As a result, startApplyingDonorOpTime timestamp can be <
+ // startMigrationDonorTimestamp, which means we can erroneously fetch and apply index build
+ // operations before startMigrationDonorTimestamp. Trigger a stable checkpoint to ensure that
+ // the recipient does not fetch and apply donor index build entries before
+ // startMigrationDonorTimestamp.
+ return AsyncTry([this, self = shared_from_this(), startMigrationDonorTimestamp] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
+ if (storageEngine->getLastStableRecoveryTimestamp() < startMigrationDonorTimestamp) {
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(
+ opCtx,
+ /*stableCheckpoint*/ true);
+ }
+ })
+ .until([this, self = shared_from_this(), startMigrationDonorTimestamp](Status status) {
+ uassertStatusOK(status);
+ auto storageEngine = getGlobalServiceContext()->getStorageEngine();
+ if (storageEngine->getLastStableRecoveryTimestamp() < startMigrationDonorTimestamp) {
+ return false;
+ }
+ return true;
+ })
+ .withBackoffBetweenIterations(Backoff(Milliseconds(100), Milliseconds(100)))
+ .on(**executor, abortToken);
+}
+
+ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnterBlockingState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& token) {
+ const CancellationToken& abortToken) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) {
@@ -1198,21 +1248,24 @@ TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnt
}
}
- return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, token)
+ return _waitUntilStartMigrationDonorTimestampIsCheckpointed(executor, abortToken)
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
+ return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, abortToken);
+ })
.then([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx);
})
- .then([this, self = shared_from_this(), executor, token] {
+ .then([this, self = shared_from_this(), executor, abortToken] {
// Enter "blocking" state.
LOGV2(6104907,
"Updating its state doc to enter 'blocking' state.",
"migrationId"_attr = _migrationUuid,
"tenantId"_attr = _tenantId);
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, token)
- .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, abortToken)
+ .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
});
});
}
@@ -1221,7 +1274,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAndEnterCommittedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& token) {
+ const CancellationToken& abortToken) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) {
@@ -1233,7 +1286,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
// Source to cancel the timeout if the operation completed in time.
CancellationSource cancelTimeoutSource;
- CancellationSource recipientSyncDataSource(token);
+ CancellationSource recipientSyncDataSource(abortToken);
auto deadlineReachedFuture =
(*executor)->sleepFor(Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()),
@@ -1286,15 +1339,15 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
uasserted(ErrorCodes::InternalError, "simulate a tenant migration error");
}
})
- .then([this, self = shared_from_this(), executor, token] {
+ .then([this, self = shared_from_this(), executor, abortToken] {
// Enter "commit" state.
LOGV2(6104908,
"Entering 'committed' state.",
"migrationId"_attr = _migrationUuid,
"tenantId"_attr = _tenantId);
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, token)
- .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), token)
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, abortToken)
+ .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken)
.then([this, self = shared_from_this()] {
pauseTenantMigrationBeforeLeavingCommittedState.pauseWhileSet();
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 87931e03343..18321b11311 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -178,17 +178,21 @@ public:
ExecutorFuture<void> _enterDataSyncState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& token);
+ const CancellationToken& abortToken);
ExecutorFuture<void> _waitForRecipientToBecomeConsistentAndEnterBlockingState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& token);
+ const CancellationToken& abortToken);
+
+ ExecutorFuture<void> _waitUntilStartMigrationDonorTimestampIsCheckpointed(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
ExecutorFuture<void> _waitForRecipientToReachBlockTimestampAndEnterCommittedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& token);
+ const CancellationToken& abortToken);
ExecutorFuture<void> _handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,