summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Caplinger <christopher.caplinger@mongodb.com>2022-02-24 18:55:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-24 19:48:03 +0000
commit972d3f082e84c82002c15b9ad3bbed05d77db514 (patch)
tree31b287e978f3af2a338dc3c5c680675891e1ca4c
parentead3b8d93e73f879a8f6d8087c7b527a57594003 (diff)
downloadmongo-972d3f082e84c82002c15b9ad3bbed05d77db514.tar.gz
SERVER-61144: Finish importing donated collections on secondaries
-rw-r--r--jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js26
-rw-r--r--jstests/replsets/tenant_migration_file_import.js27
-rw-r--r--jstests/replsets/tenant_migration_recipient_current_op.js6
-rw-r--r--jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js21
-rw-r--r--jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js35
-rw-r--r--jstests/replsets/tenant_migration_vote_progress.js8
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.cpp41
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.h56
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_coordinator.h2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp19
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp132
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h7
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.cpp213
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.h8
14 files changed, 352 insertions, 249 deletions
diff --git a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js
index f488c9176c9..0a624a8688f 100644
--- a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js
+++ b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js
@@ -34,12 +34,21 @@ const kMigrationFpNames = [
* donor using the 'interruptFunc', and verifies the command response using the
* 'verifyCmdResponseFunc'.
*/
-function testDonorStartMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) {
+function testDonorStartMigrationInterrupt(
+ interruptFunc, verifyCmdResponseFunc, {skipForShardMerge = false} = {}) {
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const donorRst = tenantMigrationTest.getDonorRst();
const donorPrimary = tenantMigrationTest.getDonorPrimary();
+ if (skipForShardMerge && TenantMigrationUtil.isShardMergeEnabled(donorPrimary.getDB("admin"))) {
+ // TODO SERVER-63390: Remove this conditional and ensure test(s) run
+ // successfully for shard merge.
+ jsTestLog("Skipping Shard Merge-incompatible test");
+ tenantMigrationTest.stop();
+ return;
+ }
+
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
@@ -109,12 +118,21 @@ function testDonorForgetMigrationInterrupt(interruptFunc, verifyCmdResponseFunc)
* donorAbortMigration, and interrupts the donor using the 'interruptFunc', and verifies the command
* response using the 'verifyCmdResponseFunc'.
*/
-function testDonorAbortMigrationInterrupt(interruptFunc, verifyCmdResponseFunc, fpName) {
+function testDonorAbortMigrationInterrupt(
+ interruptFunc, verifyCmdResponseFunc, fpName, {skipForShardMerge = false} = {}) {
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const donorRst = tenantMigrationTest.getDonorRst();
const donorPrimary = tenantMigrationTest.getDonorPrimary();
+ if (skipForShardMerge && TenantMigrationUtil.isShardMergeEnabled(donorPrimary.getDB("admin"))) {
+ // TODO SERVER-63390: Remove this conditional and ensure test(s) run
+ // successfully for shard merge.
+ jsTestLog("Skipping Shard Merge-incompatible test");
+ tenantMigrationTest.stop();
+ return;
+ }
+
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
@@ -186,7 +204,7 @@ function assertCmdSucceededOrInterruptedDueToShutDown(cmdThread) {
jsTest.log("Test that the donorStartMigration command is interrupted successfully on shutdown");
testDonorStartMigrationInterrupt((donorRst) => {
donorRst.stopSet();
- }, assertCmdSucceededOrInterruptedDueToShutDown);
+ }, assertCmdSucceededOrInterruptedDueToShutDown, {skipForShardMerge: true});
})();
(() => {
@@ -231,7 +249,7 @@ function assertCmdSucceededOrInterruptedDueToShutDown(cmdThread) {
testDonorAbortMigrationInterrupt((donorRst) => {
donorRst.stopSet();
- }, assertCmdSucceededOrInterruptedDueToShutDown, fpName);
+ }, assertCmdSucceededOrInterruptedDueToShutDown, fpName, {skipForShardMerge: true});
});
})();
})();
diff --git a/jstests/replsets/tenant_migration_file_import.js b/jstests/replsets/tenant_migration_file_import.js
index cc7d064ebed..11d10bdd870 100644
--- a/jstests/replsets/tenant_migration_file_import.js
+++ b/jstests/replsets/tenant_migration_file_import.js
@@ -18,9 +18,6 @@
(function() {
"use strict";
-// TODO (SERVER-61144): Recipient secondaries don't import donor files yet, so dbhash will mismatch.
-TestData.skipCheckDBHashes = true;
-
load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
@@ -70,18 +67,18 @@ const migrationOpts = {
};
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
-// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported
-// successfully.
-for (let collectionName of ["myCollection", "myCappedCollection"]) {
- jsTestLog(`Checking ${collectionName}`);
- // Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
- assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
- recipientPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
- "countDocuments");
- assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(),
- recipientPrimary.getDB("myDatabase")[collectionName].count(),
- "count");
-}
+tenantMigrationTest.getRecipientRst().nodes.forEach(node => {
+ for (let collectionName of ["myCollection", "myCappedCollection"]) {
+ jsTestLog(`Checking ${collectionName} on donor vs. recipient on port '${node.port}'`);
+ // Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
+ assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
+ node.getDB("myDatabase")[collectionName].countDocuments({}),
+ "countDocuments");
+ assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(),
+ node.getDB("myDatabase")[collectionName].count(),
+ "count");
+ }
+});
tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_recipient_current_op.js b/jstests/replsets/tenant_migration_recipient_current_op.js
index 5b277c0f500..4c6d1f80c61 100644
--- a/jstests/replsets/tenant_migration_recipient_current_op.js
+++ b/jstests/replsets/tenant_migration_recipient_current_op.js
@@ -155,11 +155,7 @@ checkStandardFieldsOK(res);
currOp = res.inprog[0];
assert.gt(new Date(), currOp.receiveStart, tojson(res));
-if (shardMergeIsEnabled) {
- assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kCopiedFiles, res);
-} else {
- assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res);
-}
+assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res);
assert.eq(currOp.migrationCompleted, false, res);
assert.eq(currOp.dataSyncCompleted, false, res);
diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js
index b006d0718a3..834c28a7309 100644
--- a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js
+++ b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js
@@ -47,7 +47,7 @@ tenantMigrationTest.insertDonorDB(tenantDB, collName);
// Ensure our new collections appear in the backup cursor's checkpoint.
assert.commandWorked(donorPrimary.adminCommand({fsync: 1}));
-const failpoint = "fpAfterRetrievingStartOpTimesMigrationRecipientInstance";
+const failpoint = "fpAfterStartingOplogApplierMigrationRecipientInstance";
const waitInFailPoint = configureFailPoint(recipientPrimary, failpoint, {action: "hang"});
// In order to prevent the copying of "testTenantId" databases via logical cloning from donor to
@@ -76,15 +76,16 @@ waitInFailPoint.off();
TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
-// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported
-// successfully.
-// Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
-assert.eq(donorPrimary.getDB(tenantDB)[collName].countDocuments({}),
- recipientPrimary.getDB(tenantDB)[collName].countDocuments({}),
- "countDocuments");
-assert.eq(donorPrimary.getDB(tenantDB)[collName].count(),
- recipientPrimary.getDB(tenantDB)[collName].count(),
- "count");
+const donorPrimaryCountDocumentsResult = donorPrimary.getDB(tenantDB)[collName].countDocuments({});
+const donorPrimaryCountResult = donorPrimary.getDB(tenantDB)[collName].count();
+
+tenantMigrationTest.getRecipientRst().nodes.forEach(node => {
+ // Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
+ assert.eq(donorPrimaryCountDocumentsResult,
+ node.getDB(tenantDB)[collName].countDocuments({}),
+ "countDocuments");
+ assert.eq(donorPrimaryCountResult, node.getDB(tenantDB)[collName].count(), "count");
+});
tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
index 6746b2d7935..35c3d125ff6 100644
--- a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
+++ b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
@@ -20,9 +20,6 @@
(function() {
"use strict";
-// TODO (SERVER-61144): Recipient secondaries don't import donor files yet, so dbhash will mismatch.
-TestData.skipCheckDBHashes = true;
-
load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/libs/fail_point_util.js");
@@ -38,6 +35,13 @@ if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
return;
}
+if (TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
+ // TODO SERVER-63789: Re-enable this test for Shard Merge
+ tenantMigrationTest.stop();
+ jsTestLog("Temporarily skipping Shard Merge test, dependent on SERVER-63789.");
+ return;
+}
+
const kDataDir =
`${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`;
assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0);
@@ -68,6 +72,7 @@ configureFailPoint(
recipientPrimary, "WTWriteConflictExceptionForImportCollection", {} /* data */, {times: 1});
configureFailPoint(
recipientPrimary, "WTWriteConflictExceptionForImportIndex", {} /* data */, {times: 1});
+configureFailPoint(recipientPrimary, "skipDeleteTempDBPath");
jsTestLog("Run migration");
// The old multitenant migrations won't copy myDatabase since it doesn't start with testTenantId,
@@ -79,18 +84,18 @@ const migrationOpts = {
};
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
-// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported
-// successfully.
-for (let collectionName of ["myCollection", "myCappedCollection"]) {
- jsTestLog(`Checking ${collectionName}`);
- // Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
- assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
- recipientPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
- "countDocuments");
- assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(),
- recipientPrimary.getDB("myDatabase")[collectionName].count(),
- "count");
-}
+tenantMigrationTest.getRecipientRst().nodes.forEach(node => {
+ for (let collectionName of ["myCollection", "myCappedCollection"]) {
+ jsTestLog(`Checking ${collectionName}`);
+ // Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
+ assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
+ recipientPrimary.getDB("myDatabase")[collectionName].countDocuments({}),
+ "countDocuments");
+ assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(),
+ recipientPrimary.getDB("myDatabase")[collectionName].count(),
+ "count");
+ }
+});
tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_vote_progress.js b/jstests/replsets/tenant_migration_vote_progress.js
index 677cbcbda6e..f593d503018 100644
--- a/jstests/replsets/tenant_migration_vote_progress.js
+++ b/jstests/replsets/tenant_migration_vote_progress.js
@@ -75,15 +75,15 @@ fpAfterCollectionClonerDone.wait();
fpAfterCollectionClonerDone.off();
if (TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
- jsTestLog("Test that voteCommitMigrationProgress succeeds with step 'copied files'");
- voteShouldSucceed(migrationId, ["copied files"]);
+ jsTestLog("Test that voteCommitMigrationProgress succeeds with step 'imported files'");
+ voteShouldSucceed(migrationId, ["imported files"]);
} else {
jsTestLog("Test that voteCommitMigrationProgress fails with shard merge disabled");
- voteShouldFail(migrationId, ["copied files"]);
+ voteShouldFail(migrationId, ["imported files"]);
}
jsTestLog("Test that voteCommitMigrationProgress fails with wrong 'step'");
-voteShouldFail(migrationId, ["imported files"]);
+voteShouldFail(migrationId, ["copied files"]);
fpAfterDataConsistentMigrationRecipientInstance.wait();
fpAfterDataConsistentMigrationRecipientInstance.off();
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp
index 8d4cde77fa8..6f479cd9133 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.cpp
+++ b/src/mongo/db/repl/tenant_file_importer_service.cpp
@@ -75,13 +75,18 @@ void TenantFileImporterService::onStartup(OperationContext*) {
void TenantFileImporterService::learnedFilename(const UUID& migrationId,
const BSONObj& metadataDoc) {
auto opCtx = cc().getOperationContext();
- stdx::lock_guard lk(_mutex);
- if (migrationId != _migrationId) {
- _reset(lk);
- _migrationId = migrationId;
- _scopedExecutor = std::make_shared<executor::ScopedTaskExecutor>(
- _executor,
- Status{ErrorCodes::CallbackCanceled, "TenantFileImporterService executor cancelled"});
+ {
+ stdx::lock_guard lk(_mutex);
+ if (migrationId != _migrationId) {
+ _reset(lk);
+ _migrationId = migrationId;
+ _scopedExecutor = std::make_shared<executor::ScopedTaskExecutor>(
+ _executor,
+ Status{ErrorCodes::CallbackCanceled,
+ "TenantFileImporterService executor cancelled"});
+ }
+
+ _state.setState(ImporterState::State::kCopyingFiles);
}
try {
@@ -112,15 +117,27 @@ void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) {
"Called learnedAllFilenames with migrationId {}, but {} is active"_format(
migrationId.toString(), _migrationId->toString()));
}
- if (_toldPrimaryAllFilesAreCopied) {
+
+ if (!_state.is(ImporterState::State::kCopyingFiles)) {
return;
}
- _toldPrimaryAllFilesAreCopied = true;
+ _state.setState(ImporterState::State::kCopiedFiles);
}
- // TODO (SERVER-62734): Keep count of files remaining to copy, wait before voting.
- _voteCommitMigrationProgress(MigrationProgressStepEnum::kCopiedFiles);
+ auto opCtx = cc().getOperationContext();
+ // TODO SERVER-63789: Revisit use of AllowLockAcquisitionOnTimestampedUnitOfWork and
+ // remove if possible.
+ // No other threads will try to acquire conflicting locks: we are acquiring
+ // database/collection locks for new tenants.
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ shard_merge_utils::importCopiedFiles(opCtx, migrationId);
+
+ // TODO (SERVER-62734): Keep count of files remaining to import, wait before voting.
+ _voteCommitMigrationProgress(MigrationProgressStepEnum::kImportedFiles);
+
+ stdx::lock_guard lk(_mutex);
+ _state.setState(ImporterState::State::kImportedFiles);
}
void TenantFileImporterService::reset() {
@@ -177,6 +194,6 @@ void TenantFileImporterService::_voteCommitMigrationProgress(MigrationProgressSt
void TenantFileImporterService::_reset(WithLock lk) {
_scopedExecutor.reset(); // Shuts down and joins the executor.
_migrationId.reset();
- _toldPrimaryAllFilesAreCopied = false;
+ _state.setState(ImporterState::State::kUninitialized);
}
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h
index d5b0db7d629..ec3b77a986b 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.h
+++ b/src/mongo/db/repl/tenant_file_importer_service.h
@@ -89,6 +89,60 @@ private:
std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor;
boost::optional<UUID> _migrationId;
Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex");
- bool _toldPrimaryAllFilesAreCopied;
+
+ class ImporterState {
+ public:
+ enum class State { kUninitialized, kCopyingFiles, kCopiedFiles, kImportedFiles };
+
+ void setState(State nextState) {
+ tassert(6114403,
+ str::stream() << "current state: " << toString(_state)
+ << ", new state: " << toString(nextState),
+ isValidTransition(nextState));
+ _state = nextState;
+ }
+
+ bool is(State state) {
+ return _state == state;
+ }
+
+ private:
+ StringData toString(State value) {
+ switch (value) {
+ case State::kUninitialized:
+ return "uninitialized";
+ case State::kCopyingFiles:
+ return "copying files";
+ case State::kCopiedFiles:
+ return "copied files";
+ case State::kImportedFiles:
+ return "imported files";
+ }
+ MONGO_UNREACHABLE;
+ return StringData();
+ }
+
+ bool isValidTransition(State newState) {
+ if (_state == newState) {
+ return true;
+ }
+
+ switch (_state) {
+ case State::kUninitialized:
+ return newState == State::kCopyingFiles;
+ case State::kCopyingFiles:
+ return newState == State::kCopiedFiles || newState == State::kUninitialized;
+ case State::kCopiedFiles:
+ return newState == State::kImportedFiles || newState == State::kUninitialized;
+ case State::kImportedFiles:
+ return newState == State::kUninitialized;
+ }
+ MONGO_UNREACHABLE;
+ }
+
+ State _state = State::kUninitialized;
+ };
+
+ ImporterState _state;
};
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_migration_recipient_coordinator.h b/src/mongo/db/repl/tenant_migration_recipient_coordinator.h
index 42ace3d336d..1fddf22c37e 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_coordinator.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_coordinator.h
@@ -52,7 +52,7 @@ public:
// have completed the step, or there was an error.
SharedSemiFuture<void> beginAwaitingVotesForStep(UUID migrationId,
MigrationProgressStepEnum step);
- // TODO (SERVER-61144): use cancelStep, which is currently unused.
+ // TODO (SERVER-63390): use cancelStep, which is currently unused.
void cancelStep(UUID migrationId, MigrationProgressStepEnum step);
void receivedVoteForStep(UUID migrationId,
MigrationProgressStepEnum step,
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 26148732b95..d98d1ca2bbc 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -192,8 +192,6 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
createAccessBlockerIfNeeded(opCtx, recipientStateDoc);
break;
case TenantMigrationRecipientStateEnum::kLearnedFilenames:
- repl::TenantFileImporterService::get(opCtx->getServiceContext())
- ->learnedAllFilenames(recipientStateDoc.getId());
break;
case TenantMigrationRecipientStateEnum::kCopiedFiles:
break;
@@ -206,6 +204,23 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
break;
}
});
+
+ // Perform TenantFileImporterService::learnedAllFilenames work outside of the above onCommit
+ // hook because of work done in a WriteUnitOfWork.
+ // TODO SERVER-63789: Revisit this when we make file import async and move
+ // within onCommit hook.
+ auto state = recipientStateDoc.getState();
+ auto protocol = recipientStateDoc.getProtocol().value_or(kDefaultMigrationProtocol);
+ if (state == TenantMigrationRecipientStateEnum::kLearnedFilenames) {
+ tassert(6114400,
+ "Bad state '{}' for protocol '{}'"_format(
+ TenantMigrationRecipientState_serializer(state),
+ MigrationProtocol_serializer(protocol)),
+ protocol == MigrationProtocolEnum::kShardMerge);
+
+ repl::TenantFileImporterService::get(opCtx->getServiceContext())
+ ->learnedAllFilenames(recipientStateDoc.getId());
+ }
}
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index d366084d60e..4b7611b567a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -1017,10 +1017,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
(*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo(
getMigrationUUID(), _client->getServerAddress(), metadata);
- // TODO (SERVER-61145) Uncomment the following lines when we skip
- // _getStartopTimesFromDonor entirely
- // _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime);
- // _stateDoc.setStartFetchingDonorOpTime(startApplyingDonorOpTime);
+ _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime);
LOGV2_INFO(6113001,
"Opened backup cursor on donor",
"migrationId"_attr = getMigrationUUID(),
@@ -1100,7 +1097,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
}
void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) {
- // Get the last oplog entry at the read concern majority optime in the remote oplog. It
+ // Get the last oplog entry at the read concern majority optime in the remote oplog. It
// does not matter which tenant it is for.
if (_sharedData->isResuming()) {
// We are resuming a migration.
@@ -1159,6 +1156,9 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo
"migrationId"_attr = getMigrationUUID(),
"tenantId"_attr = _stateDoc.getTenantId(),
"lastOplogEntry"_attr = lastOplogEntry2OpTime.toBSON());
+
+ // TODO (SERVER-61145) We'll want to skip this as well for shard merge, since this should
+ // be set in _getDonorFilenames.
_stateDoc.setStartApplyingDonorOpTime(lastOplogEntry2OpTime);
OpTime startFetchingDonorOpTime = lastOplogEntry1OpTime;
@@ -1938,52 +1938,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() {
.semi();
}
-SemiFuture<void> TenantMigrationRecipientService::Instance::_importCopiedFiles() {
- if (getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady();
- }
-
- return ExecutorFuture(**_scopedExecutor)
- .then([this, self = shared_from_this()] {
- auto tempWTDirectory = shard_merge_utils::fileClonerTempDir(getMigrationUUID());
-
- uassert(6113315,
- str::stream() << "Missing file cloner's temporary dbpath directory: "
- << tempWTDirectory.string(),
- boost::filesystem::exists(tempWTDirectory));
-
- // TODO SERVER-63204: Reevaluate if this is the correct place to remove the temporary
- // WT dbpath.
- ON_BLOCK_EXIT([&tempWTDirectory, &migrationId = getMigrationUUID()] {
- LOGV2_DEBUG(6113324,
- 1,
- "Done importing files, removing the temporary WT dbpath",
- "migrationId"_attr = migrationId,
- "tempDbPath"_attr = tempWTDirectory.string());
- boost::system::error_code ec;
- boost::filesystem::remove_all(tempWTDirectory, ec);
- });
-
- auto opCtx = cc().makeOperationContext();
- // TODO (SERVER-62054): do this after file-copy, on primary and secondaries.
- auto metadatas =
- wiredTigerRollbackToStableAndGetMetadata(opCtx.get(), tempWTDirectory.string());
-
- // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for
- // shard merge protocol.
- try {
- shard_merge_utils::wiredTigerImportFromBackupCursor(
- opCtx.get(), metadatas, tempWTDirectory.string());
- } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) {
- LOGV2_WARNING(
- 6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus());
- }
-
- return SemiFuture<void>::makeReady();
- })
- .semi();
-}
-
SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFuture() {
stdx::lock_guard lk(_mutex);
// PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc
@@ -2394,7 +2348,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// Any FCV changes after this point will abort this migration.
//
- // The 'AsyncTry' is run on the cleanup executor as opposed to the scoped executor as we rely
+ // The 'AsyncTry' is run on the cleanup executor as opposed to the scoped executor as we rely
// on the 'PrimaryService' to interrupt the operation contexts based on thread pool and not the
// executor.
return AsyncTry([this, self = shared_from_this(), executor, token, cancelWhenDurable] {
@@ -2558,16 +2512,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return SemiFuture<void>::makeReady().thenRunOn(**_scopedExecutor);
}
- // Before we tell all nodes what files to copy, prepare to receive their
- // voteCommitMigrationProgress commands telling the primary when they finish.
- {
- stdx::lock_guard lk(_mutex);
- _copiedFilesFuture =
- TenantMigrationRecipientCoordinator::get(_serviceContext)
- ->beginAwaitingVotesForStep(
- getMigrationUUID(), MigrationProgressStepEnum::kCopiedFiles);
- }
-
return AsyncTry([this, self = shared_from_this(), token] {
return _getDonorFilenames(token);
})
@@ -2605,34 +2549,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_donorFilenameBackupCursorId,
_donorFilenameBackupCursorNamespaceString);
})
- .then([this, self = shared_from_this(), token] {
- if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady();
- }
-
- stdx::lock_guard lk(_mutex);
- _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames);
- return _updateStateDocForMajority(lk);
- })
.then([this, self = shared_from_this()] {
- if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
- return SemiFuture<void>::makeReady();
- }
-
- LOGV2_INFO(6113402,
- "Waiting for all nodes to call voteCommitMigrationProgress with "
- "step 'copied files'");
- return std::move(_copiedFilesFuture).semi();
- })
- .then([this, self = shared_from_this()] { return _killBackupCursor(); })
- .then([this, self = shared_from_this()] {
- stdx::lock_guard lk(_mutex);
- if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) {
- _stateDoc.setState(TenantMigrationRecipientStateEnum::kCopiedFiles);
- return _updateStateDocForMajority(lk);
- }
-
- return SemiFuture<void>::makeReady();
+ return _advanceStableTimestampToStartApplyingDonorOpTime();
})
.then([this, self = shared_from_this()] {
stdx::lock_guard lk(_mutex);
@@ -2730,10 +2648,42 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return clonerFuture;
})
.then([this, self = shared_from_this()] { return _onCloneSuccess(); })
+ .then([this, self = shared_from_this(), token] {
+ if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ stdx::lock_guard lk(_mutex);
+ // Before we tell all nodes what files to copy, prepare to receive their
+ // voteCommitMigrationProgress commands telling the primary when they finish.
+ _importedFilesFuture =
+ TenantMigrationRecipientCoordinator::get(_serviceContext)
+ ->beginAwaitingVotesForStep(
+ getMigrationUUID(), MigrationProgressStepEnum::kImportedFiles);
+
+ _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames);
+ return _updateStateDocForMajority(lk);
+ })
.then([this, self = shared_from_this()] {
- return _advanceStableTimestampToStartApplyingDonorOpTime();
+ if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ LOGV2_INFO(6113402,
+ "Waiting for all nodes to call voteCommitMigrationProgress with "
+ "step 'imported files'");
+ return std::move(_importedFilesFuture).semi();
+ })
+ .then([this, self = shared_from_this()] { return _killBackupCursor(); })
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard lk(_mutex);
+ if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) {
+ _stateDoc.setState(TenantMigrationRecipientStateEnum::kCopiedFiles);
+ return _updateStateDocForMajority(lk);
+ }
+
+ return SemiFuture<void>::makeReady();
})
- .then([this, self = shared_from_this()] { return _importCopiedFiles(); })
.then([this, self = shared_from_this()] {
{
auto opCtx = cc().makeOperationContext();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 5f3898e8b0b..a31506e20a4 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -466,11 +466,6 @@ public:
SemiFuture<void> _onCloneSuccess();
/*
- * For protocol "shard merge", import donor data files.
- */
- SemiFuture<void> _importCopiedFiles();
-
- /*
* Returns a future that will be fulfilled when the tenant migration reaches consistent
* state.
*/
@@ -595,8 +590,6 @@ public:
// Promise that is resolved Signaled when the instance has started tenant database cloner
// and tenant oplog fetcher.
SharedPromise<void> _dataSyncStartedPromise; // (W)
- // Future that is resolved when all recipient nodes have copied all donor files.
- SharedSemiFuture<void> _copiedFilesFuture; // (W)
// Future that is resolved when all recipient nodes have imported all donor files.
SharedSemiFuture<void> _importedFilesFuture; // (W)
// Promise that is resolved Signaled when the tenant data sync has reached consistent point.
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
index 5c39e53af19..700e0d34916 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
@@ -59,10 +59,16 @@ constexpr int kBackupCursorKeepAliveIntervalMillis = mongo::kCursorTimeoutMillis
namespace mongo::repl::shard_merge_utils {
namespace {
+MONGO_FAIL_POINT_DEFINE(skipDeleteTempDBPath);
using namespace fmt::literals;
void moveFile(const std::string& src, const std::string& dst) {
LOGV2_DEBUG(6114304, 1, "Moving file", "src"_attr = src, "dst"_attr = dst);
+
+ tassert(6114401,
+ "Destination file '{}' already exists"_format(dst),
+ !boost::filesystem::exists(dst));
+
// Boost filesystem functions clear "ec" on success.
boost::system::error_code ec;
boost::filesystem::rename(src, dst, ec);
@@ -122,75 +128,6 @@ Status connect(const HostAndPort& source, DBClientConnection* client) {
return replAuthenticate(client).withContext(str::stream()
<< "Failed to authenticate to " << source);
}
-} // namespace
-
-void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
- std::unique_ptr<DBClientConnection> client;
- std::unique_ptr<TenantMigrationSharedData> sharedData;
- auto writerPool =
- makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
-
- ON_BLOCK_EXIT([&] {
- client->shutdownAndDisallowReconnect();
-
- writerPool->shutdown();
- writerPool->join();
- });
-
- auto fileName = metadataDoc["filename"].str();
- auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
- LOGV2_DEBUG(6113320,
- 1,
- "Cloning file",
- "migrationId"_attr = migrationId,
- "metadata"_attr = metadataDoc);
- auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
- auto remoteDbpath = metadataDoc["remoteDbpath"].str();
- size_t fileSize = metadataDoc["fileSize"].safeNumberLong();
- auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str());
- invariant(!relativePath.empty());
-
- // Connect the client.
- if (!client) {
- auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
- client = std::make_unique<DBClientConnection>(true /* autoReconnect */);
- uassertStatusOK(connect(donor, client.get()));
- }
-
- if (!sharedData) {
- sharedData = std::make_unique<TenantMigrationSharedData>(
- getGlobalServiceContext()->getFastClockSource(), migrationId);
- }
-
- auto currentBackupFileCloner =
- std::make_unique<TenantFileCloner>(backupId,
- migrationId,
- fileName,
- fileSize,
- relativePath,
- sharedData.get(),
- client->getServerHostAndPort(),
- client.get(),
- repl::StorageInterface::get(cc().getServiceContext()),
- writerPool.get());
-
- auto cloneStatus = currentBackupFileCloner->run();
- if (!cloneStatus.isOK()) {
- LOGV2_WARNING(6113321,
- "Failed to clone file ",
- "migrationId"_attr = migrationId,
- "fileName"_attr = fileName,
- "error"_attr = cloneStatus);
- } else {
- LOGV2_DEBUG(6113322,
- 1,
- "Cloned file",
- "migrationId"_attr = migrationId,
- "fileName"_attr = fileName);
- }
-
- uassertStatusOK(cloneStatus);
-}
void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
const std::vector<CollectionImportMetadata>& metadatas,
@@ -199,12 +136,26 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
/*
* Move one collection file and one or more index files from temp dir to dbpath.
*/
- moveFile(constructSourcePath(importPath, collectionMetadata.importArgs.ident),
- constructDestinationPath(collectionMetadata.importArgs.ident));
+ auto collFileSourcePath =
+ constructSourcePath(importPath, collectionMetadata.importArgs.ident);
+ auto collFileDestPath = constructDestinationPath(collectionMetadata.importArgs.ident);
+
+ moveFile(collFileSourcePath, collFileDestPath);
+
+ ScopeGuard revertCollFileMove([&] { moveFile(collFileDestPath, collFileSourcePath); });
+
+ auto indexPaths = std::vector<std::tuple<std::string, std::string>>();
+ ScopeGuard revertIndexFileMove([&] {
+ for (const auto& pathTuple : indexPaths) {
+ moveFile(std::get<1>(pathTuple), std::get<0>(pathTuple));
+ }
+ });
for (auto&& indexImportArgs : collectionMetadata.indexes) {
- moveFile(constructSourcePath(importPath, indexImportArgs.ident),
- constructDestinationPath(indexImportArgs.ident));
+ auto indexFileSourcePath = constructSourcePath(importPath, indexImportArgs.ident);
+ auto indexFileDestPath = constructDestinationPath(indexImportArgs.ident);
+ moveFile(indexFileSourcePath, indexFileDestPath);
+ indexPaths.push_back(std::tuple(indexFileSourcePath, indexFileDestPath));
}
/*
@@ -222,7 +173,9 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
AutoGetDb autoDb(opCtx, nss.db(), MODE_IX);
Lock::CollectionLock collLock(opCtx, nss, MODE_X);
auto catalog = CollectionCatalog::get(opCtx);
- WriteUnitOfWork wunit(opCtx);
+ // TODO SERVER-63789 Uncomment WriteUnitOfWork declaration below when we
+ // make file import async.
+ // WriteUnitOfWork wunit(opCtx);
AutoStatsTracker statsTracker(opCtx,
nss,
Top::LockType::NotLocked,
@@ -264,15 +217,89 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
makeCountsChange(ownedCollection->getRecordStore(), collectionMetadata));
UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection));
- wunit.commit();
+ // TODO SERVER-63789 Uncomment wunit.commit() call below when we
+ // make file copy/import async.
+ // wunit.commit();
LOGV2(6114300,
"Imported donor collection",
"ns"_attr = nss,
"numRecordsApprox"_attr = collectionMetadata.numRecords,
"dataSizeApprox"_attr = collectionMetadata.dataSize);
});
+
+ revertCollFileMove.dismiss();
+ revertIndexFileMove.dismiss();
}
}
+} // namespace
+
+void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
+ std::unique_ptr<DBClientConnection> client;
+ std::unique_ptr<TenantMigrationSharedData> sharedData;
+ auto writerPool =
+ makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
+
+ ON_BLOCK_EXIT([&] {
+ client->shutdownAndDisallowReconnect();
+
+ writerPool->shutdown();
+ writerPool->join();
+ });
+
+ auto fileName = metadataDoc["filename"].str();
+ auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
+ LOGV2_DEBUG(6113320,
+ 1,
+ "Cloning file",
+ "migrationId"_attr = migrationId,
+ "metadata"_attr = metadataDoc);
+ auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
+ auto remoteDbpath = metadataDoc["remoteDbpath"].str();
+ size_t fileSize = metadataDoc["fileSize"].safeNumberLong();
+ auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str());
+ invariant(!relativePath.empty());
+
+ // Connect the client.
+ if (!client) {
+ auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
+ client = std::make_unique<DBClientConnection>(true /* autoReconnect */);
+ uassertStatusOK(connect(donor, client.get()));
+ }
+
+ if (!sharedData) {
+ sharedData = std::make_unique<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(), migrationId);
+ }
+
+ auto currentBackupFileCloner =
+ std::make_unique<TenantFileCloner>(backupId,
+ migrationId,
+ fileName,
+ fileSize,
+ relativePath,
+ sharedData.get(),
+ client->getServerHostAndPort(),
+ client.get(),
+ repl::StorageInterface::get(cc().getServiceContext()),
+ writerPool.get());
+
+ auto cloneStatus = currentBackupFileCloner->run();
+ if (!cloneStatus.isOK()) {
+ LOGV2_WARNING(6113321,
+ "Failed to clone file ",
+ "migrationId"_attr = migrationId,
+ "fileName"_attr = fileName,
+ "error"_attr = cloneStatus);
+ } else {
+ LOGV2_DEBUG(6113322,
+ 1,
+ "Cloned file",
+ "migrationId"_attr = migrationId,
+ "fileName"_attr = fileName);
+ }
+
+ uassertStatusOK(cloneStatus);
+}
SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
std::shared_ptr<executor::TaskExecutor> executor,
@@ -296,4 +323,40 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
.onCompletion([](auto&&) {})
.semi();
}
+
+void importCopiedFiles(OperationContext* opCtx, UUID migrationId) {
+ auto tempWTDirectory = fileClonerTempDir(migrationId);
+ uassert(6113315,
+ str::stream() << "Missing file cloner's temporary dbpath directory: "
+ << tempWTDirectory.string(),
+ boost::filesystem::exists(tempWTDirectory));
+
+ // TODO SERVER-63204: Evaluate correct place to remove the temporary
+ // WT dbpath.
+ ON_BLOCK_EXIT([&tempWTDirectory, &migrationId] {
+ // TODO SERVER-63789: Delete skipDeleteTempDBPath failpoint
+ if (MONGO_unlikely(skipDeleteTempDBPath.shouldFail())) {
+ LOGV2(6114402,
+ "skipDeleteTempDBPath failpoint enabled, skipping temp directory cleanup.");
+ return;
+ }
+ LOGV2_DEBUG(6113324,
+ 1,
+ "Done importing files, removing the temporary WT dbpath",
+ "migrationId"_attr = migrationId,
+ "tempDbPath"_attr = tempWTDirectory.string());
+ boost::system::error_code ec;
+ boost::filesystem::remove_all(tempWTDirectory, ec);
+ });
+
+ auto metadatas = wiredTigerRollbackToStableAndGetMetadata(opCtx, tempWTDirectory.string());
+
+ // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for
+ // shard merge protocol.
+ try {
+ wiredTigerImportFromBackupCursor(opCtx, metadatas, tempWTDirectory.string());
+ } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) {
+ LOGV2_WARNING(6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus());
+ }
+}
} // namespace mongo::repl::shard_merge_utils
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.h b/src/mongo/db/repl/tenant_migration_shard_merge_util.h
index 217e601b70a..1c51325bb72 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.h
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.h
@@ -39,7 +39,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_import.h"
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/cancellation.h"
@@ -107,12 +106,7 @@ struct MetadataInfo {
*/
void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc);
-/**
- * After calling wiredTigerRollbackToStableAndGetMetadata, use this function to import files.
- */
-void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
- const std::vector<CollectionImportMetadata>& metadatas,
- const std::string& importPath);
+void importCopiedFiles(OperationContext* opCtx, UUID uuid);
SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
std::shared_ptr<executor::TaskExecutor> executor,