summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2022-05-17 09:43:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-17 13:24:50 +0000
commitc900fa624cda490f8664bff6facfca9f38d243ae (patch)
tree3d08d2de34c6cd0029e21b1b051615ed3e910266
parent7aa9056354f32d2681326c44c8f9fbf600775546 (diff)
downloadmongo-c900fa624cda490f8664bff6facfca9f38d243ae.tar.gz
SERVER-65947 MigrationDestinationManager must recover if an error occurs during release of the critical section
(cherry picked from commit 5bd946b1fcda93b27ab4c34297454c64c2b9702a)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/migration_retries_on_write_conflict_exceptions.js7
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp326
-rw-r--r--src/mongo/db/s/migration_destination_manager.h30
-rw-r--r--src/mongo/db/s/migration_util.cpp12
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp30
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.h5
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp95
8 files changed, 356 insertions, 153 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 56151a12bde..45dfac12e7a 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -218,6 +218,8 @@ last-continuous:
test_file: jstests/core/where_multiple_plans.js
- ticket: SERVER-66089
test_file: jstests/replsets/initial_sync_with_partial_transaction.js
+ - ticket: SERVER-65947
+ test_file: jstests/sharding/migration_retries_on_write_conflict_exceptions.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
@@ -571,6 +573,8 @@ last-lts:
test_file: jstests/core/where_multiple_plans.js
- ticket: SERVER-66089
test_file: jstests/replsets/initial_sync_with_partial_transaction.js
+ - ticket: SERVER-65947
+ test_file: jstests/sharding/migration_retries_on_write_conflict_exceptions.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
diff --git a/jstests/sharding/migration_retries_on_write_conflict_exceptions.js b/jstests/sharding/migration_retries_on_write_conflict_exceptions.js
index 0dd66c5f660..2c5c4656a0b 100644
--- a/jstests/sharding/migration_retries_on_write_conflict_exceptions.js
+++ b/jstests/sharding/migration_retries_on_write_conflict_exceptions.js
@@ -59,5 +59,10 @@ preTransferModsFailpoint.off();
awaitResult();
+// After the migration has finished, check that writes are possible on both shards (meaning the
+// critical sections have been properly released).
+assert.commandWorked(testColl.update({x: 49}, {$set: {c: 1}}));
+assert.commandWorked(testColl.update({x: 50}, {$set: {c: 1}}));
+
st.stop();
-})(); \ No newline at end of file
+})();
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index cb9b554ff88..62ace3566b0 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -282,9 +282,9 @@ bool migrationRecipientRecoveryDocumentExists(OperationContext* opCtx,
PersistentTaskStore<MigrationRecipientRecoveryDocument> store(
NamespaceString::kMigrationRecipientsNamespace);
- BSONObjBuilder bob;
- sessionId.append(&bob);
- return store.count(opCtx, bob.obj()) > 0;
+ return store.count(opCtx,
+ BSON(MigrationRecipientRecoveryDocument::kMigrationSessionIdFieldName
+ << sessionId.toString())) > 0;
}
// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread which
@@ -303,10 +303,17 @@ MONGO_FAIL_POINT_DEFINE(migrationRecipientFailPostCommitRefresh);
} // namespace
+const ReplicaSetAwareServiceRegistry::Registerer<MigrationDestinationManager> mdmRegistry(
+ "MigrationDestinationManager");
+
MigrationDestinationManager::MigrationDestinationManager() = default;
MigrationDestinationManager::~MigrationDestinationManager() = default;
+MigrationDestinationManager* MigrationDestinationManager::get(ServiceContext* serviceContext) {
+ return &getMigrationDestinationManager(serviceContext);
+}
+
MigrationDestinationManager* MigrationDestinationManager::get(OperationContext* opCtx) {
return &getMigrationDestinationManager(opCtx->getServiceContext());
}
@@ -463,6 +470,12 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_sessionId = cloneRequest.getSessionId();
_scopedReceiveChunk = std::move(scopedReceiveChunk);
+ invariant(!_canReleaseCriticalSectionPromise);
+ _canReleaseCriticalSectionPromise = std::make_unique<SharedPromise<void>>();
+
+ invariant(!_migrateThreadFinishedPromise);
+ _migrateThreadFinishedPromise = std::make_unique<SharedPromise<State>>();
+
// TODO: If we are here, the migrate thread must have completed, otherwise _active above
// would be false, so this would never block. There is no better place with the current
// implementation where to join the thread.
@@ -470,11 +483,13 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_migrateThreadHandle.join();
}
- _sessionMigration =
- std::make_unique<SessionCatalogMigrationDestination>(_nss, _fromShard, *_sessionId);
+ _sessionMigration = std::make_unique<SessionCatalogMigrationDestination>(
+ _nss, _fromShard, *_sessionId, _cancellationSource.token());
ShardingStatistics::get(opCtx).countRecipientMoveChunkStarted.addAndFetch(1);
- _migrateThreadHandle = stdx::thread([this]() { _migrateThread(); });
+ _migrateThreadHandle = stdx::thread([this, cancellationToken = _cancellationSource.token()]() {
+ _migrateThread(cancellationToken);
+ });
return Status::OK();
}
@@ -483,7 +498,8 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState(
OperationContext* opCtx,
ScopedReceiveChunk scopedReceiveChunk,
const MigrationRecipientRecoveryDocument& recoveryDoc) {
- invariant(!isActive());
+ stdx::lock_guard<Latch> sl(_mutex);
+ invariant(!_sessionId);
_scopedReceiveChunk = std::move(scopedReceiveChunk);
_nss = recoveryDoc.getNss();
@@ -496,26 +512,21 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState(
_state = kCommitStart;
_acquireCSOnRecipient = true;
- LOGV2(6064500, "Recovering migration recipient", "sessionId"_attr = *_sessionId);
-
- RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
- opCtx, _nss, criticalSectionReason(*_sessionId), ShardingCatalogClient::kLocalWriteConcern);
+ invariant(!_canReleaseCriticalSectionPromise);
+ _canReleaseCriticalSectionPromise = std::make_unique<SharedPromise<void>>();
- LOGV2_DEBUG(6064501,
- 2,
- "Reacquired migration recipient critical section",
- "sessionId"_attr = *_sessionId);
+ invariant(!_migrateThreadFinishedPromise);
+ _migrateThreadFinishedPromise = std::make_unique<SharedPromise<State>>();
- _state = kEnteredCritSec;
+ LOGV2(6064500, "Recovering migration recipient", "sessionId"_attr = *_sessionId);
if (_migrateThreadHandle.joinable()) {
_migrateThreadHandle.join();
}
- _migrateThreadHandle =
- stdx::thread([this]() { _migrateThread(true /* skipToCritSecTaken */); });
-
- LOGV2(6064503, "Recovered migration recipient", "sessionId"_attr = *_sessionId);
+ _migrateThreadHandle = stdx::thread([this, cancellationToken = _cancellationSource.token()]() {
+ _migrateThread(cancellationToken, true /* skipToCritSecTaken */);
+ });
return Status::OK();
}
@@ -711,51 +722,52 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
Status MigrationDestinationManager::exitCriticalSection(OperationContext* opCtx,
const MigrationSessionId& sessionId) {
- stdx::unique_lock<Latch> lock(_mutex);
- if (!_sessionId || !_sessionId->matches(sessionId)) {
- LOGV2_DEBUG(5899104,
- 2,
- "Request to exit recipient critical section does not match current session",
- "requested"_attr = sessionId,
- "current"_attr = _sessionId);
-
- if (migrationRecipientRecoveryDocumentExists(opCtx, sessionId)) {
- // This node may have stepped down and interrupted the migrateThread, which reset
- // _sessionId. But the critical section may not have been released so it will be
- // recovered by the new primary.
- return {ErrorCodes::CommandFailed,
- "Recipient migration recovery document still exists"};
- }
+ SharedSemiFuture<State> threadFinishedFuture;
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ if (!_sessionId || !_sessionId->matches(sessionId)) {
+ LOGV2_DEBUG(5899104,
+ 2,
+ "Request to exit recipient critical section does not match current session",
+ "requested"_attr = sessionId,
+ "current"_attr = _sessionId);
+
+ if (migrationRecipientRecoveryDocumentExists(opCtx, sessionId)) {
+ // This node may have stepped down and interrupted the migrateThread, which reset
+ // _sessionId. But the critical section may not have been released so it will be
+ // recovered by the new primary.
+ return {ErrorCodes::CommandFailed,
+ "Recipient migration recovery document still exists"};
+ }
- // Ensure the command's wait for writeConcern will until the recovery document is deleted.
- repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
+ // Ensure the command's wait for writeConcern will until the recovery document is
+ // deleted.
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
- return Status::OK();
- }
+ return Status::OK();
+ }
- if (_state < kEnteredCritSec) {
- return {ErrorCodes::CommandFailed, "recipient critical section has not yet been entered"};
- }
+ if (_state < kEnteredCritSec) {
+ return {ErrorCodes::CommandFailed,
+ "recipient critical section has not yet been entered"};
+ }
- // If the thread is waiting to be signaled to release the CS, signal it by transitioning
- // _state to EXIT_CRIT_SEC
- if (_state == kEnteredCritSec) {
- _state = kExitCritSec;
- _stateChangedCV.notify_all();
+ // Fulfill the promise to let the migrateThread release the critical section.
+ invariant(_canReleaseCriticalSectionPromise);
+ if (!_canReleaseCriticalSectionPromise->getFuture().isReady()) {
+ _canReleaseCriticalSectionPromise->emplaceValue();
+ }
+
+ threadFinishedFuture = _migrateThreadFinishedPromise->getFuture();
}
- // Wait for the thread to finish
- opCtx->waitForConditionOrInterrupt(_isActiveCV, lock, [&]() { return !_sessionId; });
+ // Wait for the migrateThread to finish
+ const auto threadFinishState = threadFinishedFuture.get(opCtx);
- if (_state != kDone) {
+ if (threadFinishState != kDone) {
return {ErrorCodes::CommandFailed, "exitCriticalSection failed"};
}
- dassert(
- !migrationRecipientRecoveryDocumentExists(opCtx, sessionId),
- str::stream() << "Recipient migration recovery document should not exist exist. SessionId: "
- << sessionId.toString());
-
LOGV2_DEBUG(
5899105, 2, "Succeeded releasing recipient critical section", "requested"_attr = sessionId);
@@ -1041,7 +1053,10 @@ void MigrationDestinationManager::cloneCollectionIndexesAndOptions(
}
}
-void MigrationDestinationManager::_migrateThread(bool skipToCritSecTaken) {
+void MigrationDestinationManager::_migrateThread(CancellationToken cancellationToken,
+ bool skipToCritSecTaken) {
+ invariant(_sessionId);
+
Client::initThread("migrateThread");
auto client = Client::getCurrent();
{
@@ -1049,41 +1064,77 @@ void MigrationDestinationManager::_migrateThread(bool skipToCritSecTaken) {
client->setSystemOperationKillableByStepdown(lk);
}
- auto uniqueOpCtx = client->makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
+ bool recovering = false;
+ while (true) {
+ const auto executor =
+ Grid::get(client->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto uniqueOpCtx =
+ CancelableOperationContext(client->makeOperationContext(), cancellationToken, executor);
+ auto opCtx = uniqueOpCtx.get();
- if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) {
- AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx);
- }
+ if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) {
+ AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx);
+ }
- try {
- // The outer OperationContext is used to hold the session checked out for the
- // duration of the recipient's side of the migration. This guarantees that if the
- // donor shard has failed over, then the new donor primary cannot bump the
- // txnNumber on this session while this node is still executing the recipient side
- // (which is important because otherwise, this node may create orphans after the
- // range deletion task on this node has been processed). The recipient will periodically
- // yield this session, but will verify the txnNumber has not changed before continuing,
- // preserving the guarantee that orphans cannot be created after the txnNumber is advanced.
- opCtx->setLogicalSessionId(_lsid);
- opCtx->setTxnNumber(_txnNumber);
-
- MongoDOperationContextSession sessionTxnState(opCtx);
-
- auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(opCtx,
- {*opCtx->getTxnNumber()},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
- _migrateDriver(opCtx, skipToCritSecTaken);
- } catch (...) {
- _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
+ try {
+ if (recovering) {
+ if (!migrationRecipientRecoveryDocumentExists(opCtx, *_sessionId)) {
+ // No need to run any recovery.
+ break;
+ }
+ }
+
+ // The outer OperationContext is used to hold the session checked out for the
+ // duration of the recipient's side of the migration. This guarantees that if the
+ // donor shard has failed over, then the new donor primary cannot bump the
+ // txnNumber on this session while this node is still executing the recipient side
+ // (which is important because otherwise, this node may create orphans after the
+ // range deletion task on this node has been processed). The recipient will periodically
+ // yield this session, but will verify the txnNumber has not changed before continuing,
+ // preserving the guarantee that orphans cannot be created after the txnNumber is
+ // advanced.
+ opCtx->setLogicalSessionId(_lsid);
+ opCtx->setTxnNumber(_txnNumber);
+
+ MongoDOperationContextSession sessionTxnState(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+ _migrateDriver(opCtx, skipToCritSecTaken || recovering);
+ } catch (...) {
+ _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
+
+ if (!cancellationToken.isCanceled() && _acquireCSOnRecipient) {
+ // Run recovery if needed.
+ recovering = true;
+ continue;
+ }
+ }
+
+ break;
}
stdx::lock_guard<Latch> lk(_mutex);
_sessionId.reset();
_scopedReceiveChunk.reset();
_isActiveCV.notify_all();
+
+ // If we reached this point without having set _canReleaseCriticalSectionPromise we must be on
+ // an error path. Just set the promise with error because it is illegal to leave it unset on
+ // destruction.
+ invariant(_canReleaseCriticalSectionPromise);
+ if (!_canReleaseCriticalSectionPromise->getFuture().isReady()) {
+ _canReleaseCriticalSectionPromise->setError(
+ {ErrorCodes::CallbackCanceled, "explicitly breaking release critical section promise"});
+ }
+ _canReleaseCriticalSectionPromise.reset();
+
+ invariant(_migrateThreadFinishedPromise);
+ _migrateThreadFinishedPromise->emplaceValue(_state);
+ _migrateThreadFinishedPromise.reset();
}
void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
@@ -1592,10 +1643,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
"Persisted migration recipient recovery document",
"sessionId"_attr = _sessionId);
- // Enter critical section
+ // Enter critical section. Ensure it has been majority commited before
+ // _recvChunkCommit returns success to the donor, so that if the recipient steps
+ // down, the critical section is kept taken while the donor commits the migration.
RecoverableCriticalSectionService::get(opCtx)
->acquireRecoverableCriticalSectionBlockWrites(
- opCtx, _nss, critSecReason, ShardingCatalogClient::kLocalWriteConcern);
+ opCtx, _nss, critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
LOGV2(5899114, "Entered migration recipient critical section", logAttrs(_nss));
timeInCriticalSection.emplace();
@@ -1614,6 +1667,41 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
}
}
}
+ } else {
+ // We can only ever be in this path if the recipient critical section feature is enabled.
+ invariant(_acquireCSOnRecipient);
+
+ outerOpCtx->setAlwaysInterruptAtStepDownOrUp();
+ auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillableByStepdown(lk);
+ }
+ AlternativeClientRegion acr(newClient);
+ auto executor =
+ Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto newOpCtxPtr = CancelableOperationContext(
+ cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor);
+ auto opCtx = newOpCtxPtr.get();
+
+ RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx,
+ _nss,
+ criticalSectionReason(*_sessionId),
+ ShardingCatalogClient::kMajorityWriteConcern);
+
+ LOGV2_DEBUG(6064501,
+ 2,
+ "Reacquired migration recipient critical section",
+ "sessionId"_attr = *_sessionId);
+
+ {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _state = kEnteredCritSec;
+ _stateChangedCV.notify_all();
+ }
+
+ LOGV2(6064503, "Recovered migration recipient", "sessionId"_attr = *_sessionId);
}
if (_acquireCSOnRecipient) {
@@ -1795,35 +1883,33 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi
OperationContext* opCtx, const Timer& timeInCriticalSection) {
// Wait until the migrate thread is signaled to release the critical section
LOGV2_DEBUG(5899111, 3, "Waiting for release critical section signal");
- {
- stdx::unique_lock<Latch> lock(_mutex);
- opCtx->waitForConditionOrInterrupt(
- _stateChangedCV, lock, [&]() { return _state != kEnteredCritSec; });
- }
+ invariant(_canReleaseCriticalSectionPromise);
+ _canReleaseCriticalSectionPromise->getFuture().get(opCtx);
- invariant(_state == kExitCritSec || _state == kFail || _state == kAbort);
+ _setState(kExitCritSec);
// Refresh the filtering metadata
- if (_state == kExitCritSec) {
- LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section");
+ LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section");
- try {
- if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) {
- uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint");
- }
+ bool refreshFailed = false;
+ try {
+ if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) {
+ uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint");
+ }
- forceShardFilteringMetadataRefresh(opCtx, _nss);
- } catch (const DBException& ex) {
- LOGV2_DEBUG(5899103,
- 2,
- "Post-migration commit refresh failed on recipient",
- "migrationId"_attr = _migrationId,
- "error"_attr = redact(ex));
+ forceShardFilteringMetadataRefresh(opCtx, _nss);
+ } catch (const DBException& ex) {
+ LOGV2_DEBUG(5899103,
+ 2,
+ "Post-migration commit refresh failed on recipient",
+ "migrationId"_attr = _migrationId,
+ "error"_attr = redact(ex));
+ refreshFailed = true;
+ }
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
- CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx);
- }
+ if (refreshFailed) {
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx);
}
// Release the critical section
@@ -1853,4 +1939,28 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi
migrationutil::deleteMigrationRecipientRecoveryDocument(opCtx, *_migrationId);
}
+void MigrationDestinationManager::onStepUpBegin(OperationContext* opCtx, long long term) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ auto newCancellationSource = CancellationSource();
+ std::swap(_cancellationSource, newCancellationSource);
+}
+
+void MigrationDestinationManager::onStepDown() {
+ boost::optional<SharedSemiFuture<State>> migrateThreadFinishedFuture;
+ {
+ stdx::lock_guard<Latch> sl(_mutex);
+ // Cancel any migrateThread work.
+ _cancellationSource.cancel();
+
+ if (_migrateThreadFinishedPromise) {
+ migrateThreadFinishedFuture = _migrateThreadFinishedPromise->getFuture();
+ }
+ }
+
+ // Wait for the migrateThread to finish.
+ if (migrateThreadFinishedFuture) {
+ migrateThreadFinishedFuture->wait();
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index c1421fc51ea..925296091cb 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -37,6 +37,7 @@
#include "mongo/bson/oid.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/migration_recipient_recovery_document_gen.h"
#include "mongo/db/s/migration_session_id.h"
@@ -46,6 +47,7 @@
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/cancellation.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/timer.h"
@@ -70,7 +72,8 @@ struct CollectionOptionsAndIndexes {
/**
* Drives the receiving side of the MongoD migration process. One instance exists per shard.
*/
-class MigrationDestinationManager {
+class MigrationDestinationManager
+ : public ReplicaSetAwareServiceShardSvr<MigrationDestinationManager> {
MigrationDestinationManager(const MigrationDestinationManager&) = delete;
MigrationDestinationManager& operator=(const MigrationDestinationManager&) = delete;
@@ -94,6 +97,7 @@ public:
/**
* Returns the singleton instance of the migration destination manager.
*/
+ static MigrationDestinationManager* get(ServiceContext* serviceContext);
static MigrationDestinationManager* get(OperationContext* opCtx);
State getState() const;
@@ -211,7 +215,7 @@ private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
- void _migrateThread(bool skipToCritSecTaken = false);
+ void _migrateThread(CancellationToken cancellationToken, bool skipToCritSecTaken = false);
void _migrateDriver(OperationContext* opCtx, bool skipToCritSecTaken = false);
@@ -255,6 +259,17 @@ private:
void awaitCriticalSectionReleaseSignalAndCompleteMigration(OperationContext* opCtx,
const Timer& timeInCriticalSection);
+ /**
+ * ReplicaSetAwareService entry points.
+ */
+ void onStartup(OperationContext* opCtx) final {}
+ void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) final {}
+ void onShutdown() final {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) final;
+ void onStepUpComplete(OperationContext* opCtx, long long term) final {}
+ void onStepDown() final;
+ void onBecomeArbiter() final {}
+
// Mutex to guard all fields
mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationDestinationManager::_mutex");
@@ -303,6 +318,17 @@ private:
stdx::condition_variable _stateChangedCV;
bool _acquireCSOnRecipient{false};
+
+ // Promise that will be fulfilled when the donor has signaled us that we can release the
+ // critical section.
+ std::unique_ptr<SharedPromise<void>> _canReleaseCriticalSectionPromise;
+
+ // Promise that will be fulfilled when the migrateThread has finished its work.
+ std::unique_ptr<SharedPromise<State>> _migrateThreadFinishedPromise;
+
+ // Cancellation source that is cancelled on stepdowns. On stepup, a new cancellation source will
+ // be installed.
+ CancellationSource _cancellationSource;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index e4cd3ce5683..ec63e7477af 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -198,7 +198,8 @@ void sendWriteCommandToRecipient(OperationContext* opCtx,
void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
OperationContext* opCtx,
StringData taskDescription,
- std::function<void(OperationContext*)> doWork) {
+ std::function<void(OperationContext*)> doWork,
+ boost::optional<Backoff> backoff = boost::none) {
const std::string newClientName = "{}-{}"_format(getThreadName(), taskDescription);
const auto initialTerm = repl::ReplicationCoordinator::get(opCtx)->getTerm();
@@ -236,6 +237,10 @@ void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
doWork(newOpCtx.get());
break;
} catch (DBException& ex) {
+ if (backoff) {
+ sleepFor(backoff->nextSleep());
+ }
+
if (attempt % kLogRetryAttemptThreshold == 1) {
LOGV2_WARNING(23937,
"Retrying task after failed attempt",
@@ -1232,8 +1237,9 @@ ExecutorFuture<void> launchReleaseCriticalSectionOnRecipientFuture(
"shardId"_attr = recipientShardId,
"sessionId"_attr = sessionId,
"error"_attr = exShardNotFound);
- };
- });
+ }
+ },
+ Backoff(Seconds(1), Milliseconds::max()));
});
}
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 5505acde25e..f00a2215460 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -37,6 +37,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/connection_string.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/logical_session_id.h"
@@ -199,7 +200,9 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
* information. The new oplogEntry will also link to prePostImageTs if not null.
*/
ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
- const ProcessOplogResult& lastResult) {
+ const ProcessOplogResult& lastResult,
+ ServiceContext* serviceContext,
+ CancellationToken cancellationToken) {
auto oplogEntry = parseOplog(oplogBSON);
ProcessOplogResult result;
@@ -245,7 +248,9 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
const auto stmtIds = oplogEntry.getStatementIds();
- auto uniqueOpCtx = cc().makeOperationContext();
+ auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
+ auto uniqueOpCtx =
+ CancelableOperationContext(cc().makeOperationContext(), cancellationToken, executor);
auto opCtx = uniqueOpCtx.get();
opCtx->setLogicalSessionId(result.sessionId);
opCtx->setTxnNumber(result.txnNum);
@@ -342,10 +347,14 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
} // namespace
SessionCatalogMigrationDestination::SessionCatalogMigrationDestination(
- NamespaceString nss, ShardId fromShard, MigrationSessionId migrationSessionId)
+ NamespaceString nss,
+ ShardId fromShard,
+ MigrationSessionId migrationSessionId,
+ CancellationToken cancellationToken)
: _nss(std::move(nss)),
_fromShard(std::move(fromShard)),
- _migrationSessionId(std::move(migrationSessionId)) {}
+ _migrationSessionId(std::move(migrationSessionId)),
+ _cancellationToken(std::move(cancellationToken)) {}
SessionCatalogMigrationDestination::~SessionCatalogMigrationDestination() {
if (_thread.joinable()) {
@@ -427,7 +436,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
BSONObj nextBatch;
BSONArray oplogArray;
{
- auto uniqueCtx = cc().makeOperationContext();
+ auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor();
+ auto uniqueCtx = CancelableOperationContext(
+ cc().makeOperationContext(), _cancellationToken, executor);
auto opCtx = uniqueCtx.get();
nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
@@ -482,7 +493,8 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
for (BSONArrayIteratorSorted oplogIter(oplogArray); oplogIter.more();) {
try {
- lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult);
+ lastResult = processSessionOplog(
+ oplogIter.next().Obj(), lastResult, service, _cancellationToken);
} catch (const ExceptionFor<ErrorCodes::TransactionTooOld>&) {
// This means that the server has a newer txnNumber than the oplog being
// migrated, so just skip it
@@ -492,7 +504,11 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
WriteConcernResult unusedWCResult;
- auto uniqueOpCtx = cc().makeOperationContext();
+
+ auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor();
+ auto uniqueOpCtx =
+ CancelableOperationContext(cc().makeOperationContext(), _cancellationToken, executor);
+
uassertStatusOK(
waitForWriteConcern(uniqueOpCtx.get(), lastResult.oplogTime, kMajorityWC, &unusedWCResult));
diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h
index 890e3eec033..0dededc6699 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.h
+++ b/src/mongo/db/s/session_catalog_migration_destination.h
@@ -40,6 +40,7 @@
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
+#include "mongo/util/cancellation.h"
#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
@@ -69,7 +70,8 @@ public:
SessionCatalogMigrationDestination(NamespaceString nss,
ShardId fromShard,
- MigrationSessionId migrationSessionId);
+ MigrationSessionId migrationSessionId,
+ CancellationToken cancellationToken);
~SessionCatalogMigrationDestination();
/**
@@ -113,6 +115,7 @@ private:
const NamespaceString _nss;
const ShardId _fromShard;
const MigrationSessionId _migrationSessionId;
+ const CancellationToken _cancellationToken;
stdx::thread _thread;
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 0b634db0ff6..d1e5a11848c 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -281,6 +281,8 @@ public:
->setFilteringMetadata(operationContext(), CollectionMetadata());
}
+ CancellationSource _cancellationSource;
+
private:
std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient() override {
class StaticCatalogClient final : public ShardingCatalogClientMock {
@@ -328,7 +330,8 @@ private:
};
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTransfer) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
finishSessionExpectSuccess(&sessionMigration);
@@ -337,7 +340,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTr
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -397,7 +401,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithMultiStmtIds) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -460,7 +465,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithMultiStmtIds) {
TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -515,7 +521,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -579,7 +586,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
const auto sessionId1 = makeLogicalSessionIdForTest();
const auto sessionId2 = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo1;
@@ -662,7 +670,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -727,7 +736,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -819,7 +829,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPreImageFindAndModify) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -913,7 +924,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPreImag
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -1004,7 +1016,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPostImageFindAndModify) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -1096,7 +1109,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPostIma
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -1202,7 +1216,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
<< "newerSess"),
0);
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo oldSessionInfo;
@@ -1253,7 +1268,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
auto opCtx = operationContext();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo oldSessionInfo;
@@ -1310,7 +1326,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1325,7 +1342,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkErr
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoOplog) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1340,7 +1358,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithBadOplogFormat) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1359,7 +1378,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoSessionId) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1384,7 +1404,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoTxnNumber) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1408,7 +1429,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoStmtId) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1438,7 +1460,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
const auto sessionId = makeLogicalSessionIdForTest();
auto opCtx = operationContext();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
OperationSessionInfo sessionInfo;
@@ -1503,7 +1526,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -1541,7 +1565,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImage
TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorForPreImageOplogWithNonMatchingSessionId) {
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -1582,7 +1607,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNonMatchingTxnNum) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -1624,7 +1650,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorIfPreImageOplogFollowWithOplogWithNoPreImageLink) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -1663,7 +1690,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorIfOplogWithPreImageLinkIsPrecededByNormalOplog) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -1704,7 +1732,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ShouldErrorIfOplogWithPostImageLinkIsPrecededByNormalOplog) {
const auto sessionId = makeLogicalSessionIdForTest();
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
@@ -1751,7 +1780,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30);
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
auto oplog1 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
@@ -1834,7 +1864,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
Date_t::now(), // wall clock time
{5})}; // statement ids
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -1914,7 +1945,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
Date_t::now(), // wall clock time
{55})}; // statement ids
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
@@ -2008,7 +2040,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, MigratingKnownStmtWhileOplogTrunc
Date_t::now(), // wall clock time
{kStmtId}); // statement ids
- SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
+ SessionCatalogMigrationDestination sessionMigration(
+ kNs, kFromShard, migrationId(), _cancellationSource.token());
sessionMigration.start(getServiceContext());
sessionMigration.finish();