summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_destination_manager.cpp
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2020-07-08 23:25:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-16 14:06:54 +0000
commit21b083c7352704fc8c3d8a4f33c54040259ff766 (patch)
tree333f80cff7fb94bf382d01be4c1a46f1e9d6b70a /src/mongo/db/s/migration_destination_manager.cpp
parent4bfdc5ddfc4ad5569cf995e734c4b2efe77f769a (diff)
downloadmongo-21b083c7352704fc8c3d8a4f33c54040259ff766.tar.gz
SERVER-48641 SERVER-48689 Yield session in migration destination driver when waiting on replication and session migration
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp112
1 files changed, 90 insertions, 22 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index e18a250adf5..1cfcfb67d4b 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -91,6 +91,46 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
-1);
+void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
+ MongoDOperationContextSession::checkOut(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
+ *opCtx->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+}
+
+template <typename Callable>
+constexpr bool returnsVoid() {
+ return std::is_void_v<std::invoke_result_t<Callable>>;
+}
+
+// Yields the checked out session before running the given function. If the function runs without
+// throwing, will reacquire the session and verify it is still valid to proceed with the migration.
+template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
+auto runWithoutSession(OperationContext* opCtx, Callable&& callable) {
+ MongoDOperationContextSession::checkIn(opCtx);
+
+ auto retVal = callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+
+ return retVal;
+}
+
+// Same as runWithoutSession above but takes a void function.
+template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
+void runWithoutSession(OperationContext* opCtx, Callable&& callable) {
+ MongoDOperationContextSession::checkIn(opCtx);
+
+ callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+}
+
/**
* Returns a human-readabale name of the migration manager's state.
*/
@@ -771,8 +811,10 @@ void MigrationDestinationManager::_migrateThread() {
// duration of the recipient's side of the migration. This guarantees that if the
// donor shard has failed over, then the new donor primary cannot bump the
// txnNumber on this session while this node is still executing the recipient side
- //(which is important because otherwise, this node may create orphans after the
- // range deletion task on this node has been processed).
+ // (which is important because otherwise, this node may create orphans after the
+ // range deletion task on this node has been processed). The recipient will periodically
+ // yield this session, but will verify the txnNumber has not changed before continuing,
+ // preserving the guarantee that orphans cannot be created after the txnNumber is advanced.
opCtx->setLogicalSessionId(_lsid);
opCtx->setTxnNumber(_txnNumber);
@@ -875,7 +917,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
range,
CleanWhenEnum::kNow);
recipientDeletionTask.setPending(true);
- migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask);
+
+ // It is illegal to wait for write concern with a session checked out, so persist the range
+ // deletion task with an immediately satsifiable write concern and then wait for majority
+ // after yielding the session.
+ migrationutil::persistRangeDeletionTaskLocally(
+ outerOpCtx, recipientDeletionTask, WriteConcernOptions());
+
+ runWithoutSession(outerOpCtx, [&] {
+ WriteConcernResult ignoreResult;
+ auto latestOpTime =
+ repl::ReplClientInfo::forClient(outerOpCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(
+ outerOpCtx, latestOpTime, WriteConcerns::kMajorityWriteConcern, &ignoreResult));
+ });
timing.done(1);
migrateThreadHangAtStep1.pauseWhileSet();
@@ -918,6 +973,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
auto assertNotAborted = [&](OperationContext* opCtx) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
uassert(50748, "Migration aborted while copying documents", getState() != ABORT);
};
@@ -962,18 +1018,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
_clonedBytes += batchClonedBytes;
}
if (_writeConcern.needToWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _migrationId.toBSON());
- } else {
- uassertStatusOK(replStatus.status);
- }
+ runWithoutSession(outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(
+ 22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId.toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
}
sleepmillis(migrateCloneInsertionBatchDelayMS.load());
@@ -1042,6 +1101,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
int i;
for (i = 0; i < maxIterations; i++) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
if (getState() == ABORT) {
LOGV2(22002,
@@ -1050,8 +1110,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return;
}
- if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern))
+ if (runWithoutSession(outerOpCtx, [&] {
+ return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern);
+ })) {
break;
+ }
if (i > 100) {
LOGV2(22003,
@@ -1085,10 +1148,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
"Starting majority commit wait on recipient",
"migrationId"_attr = _migrationId.toBSON());
- auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx, lastOpApplied, _writeConcern);
- uassertStatusOKWithContext(awaitReplicationResult.status,
- awaitReplicationResult.status.codeString());
+ runWithoutSession(outerOpCtx, [&] {
+ auto awaitReplicationResult =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx, lastOpApplied, _writeConcern);
+ uassertStatusOKWithContext(awaitReplicationResult.status,
+ awaitReplicationResult.status.codeString());
+ });
LOGV2(22005,
"Chunk data replicated successfully.",
@@ -1107,6 +1173,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
bool transferAfterCommit = false;
while (getState() == STEADY || getState() == COMMIT_START) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
// Make sure we do at least one transfer after recv'ing the commit message. If we
// aren't sure that at least one transfer happens *after* our state changes to
@@ -1144,7 +1211,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (_flushPendingWrites(opCtx, lastOpApplied)) {
+ if (runWithoutSession(outerOpCtx,
+ [&] { return _flushPendingWrites(opCtx, lastOpApplied); })) {
break;
}
}
@@ -1163,7 +1231,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
migrateThreadHangAtStep5.pauseWhileSet();
}
- _sessionMigration->join();
+ runWithoutSession(outerOpCtx, [&] { _sessionMigration->join(); });
if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
_setStateFail(redact(_sessionMigration->getErrMsg()));
return;