summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp3
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp112
-rw-r--r--src/mongo/db/s/migration_util.cpp5
-rw-r--r--src/mongo/db/s/migration_util.h3
5 files changed, 102 insertions, 26 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 124e3aa1cd2..20480170b71 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -87,6 +87,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/vector_clock.h"
@@ -1941,6 +1942,10 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
+ // It is illegal to wait for replication with a session checked out because it can lead to
+ // deadlocks.
+ invariant(OperationContextSession::get(opCtx) == nullptr);
+
Timer timer;
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index b6d45d143dc..a4fe84afb6d 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -123,7 +123,8 @@ void MigrationCoordinator::startMigration(OperationContext* opCtx) {
_waitForDelete ? CleanWhenEnum::kNow
: CleanWhenEnum::kDelayed);
donorDeletionTask.setPending(true);
- migrationutil::persistRangeDeletionTaskLocally(opCtx, donorDeletionTask);
+ migrationutil::persistRangeDeletionTaskLocally(
+ opCtx, donorDeletionTask, WriteConcerns::kMajorityWriteConcern);
}
void MigrationCoordinator::setMigrationDecision(Decision decision) {
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index e18a250adf5..1cfcfb67d4b 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -91,6 +91,46 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
-1);
+void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
+ MongoDOperationContextSession::checkOut(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
+ *opCtx->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+}
+
+template <typename Callable>
+constexpr bool returnsVoid() {
+ return std::is_void_v<std::invoke_result_t<Callable>>;
+}
+
+// Yields the checked out session before running the given function. If the function runs without
+// throwing, will reacquire the session and verify it is still valid to proceed with the migration.
+template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
+auto runWithoutSession(OperationContext* opCtx, Callable&& callable) {
+ MongoDOperationContextSession::checkIn(opCtx);
+
+ auto retVal = callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+
+ return retVal;
+}
+
+// Same as runWithoutSession above but takes a void function.
+template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
+void runWithoutSession(OperationContext* opCtx, Callable&& callable) {
+ MongoDOperationContextSession::checkIn(opCtx);
+
+ callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+ checkOutSessionAndVerifyTxnState(opCtx);
+}
+
/**
* Returns a human-readabale name of the migration manager's state.
*/
@@ -771,8 +811,10 @@ void MigrationDestinationManager::_migrateThread() {
// duration of the recipient's side of the migration. This guarantees that if the
// donor shard has failed over, then the new donor primary cannot bump the
// txnNumber on this session while this node is still executing the recipient side
- //(which is important because otherwise, this node may create orphans after the
- // range deletion task on this node has been processed).
+ // (which is important because otherwise, this node may create orphans after the
+ // range deletion task on this node has been processed). The recipient will periodically
+ // yield this session, but will verify the txnNumber has not changed before continuing,
+ // preserving the guarantee that orphans cannot be created after the txnNumber is advanced.
opCtx->setLogicalSessionId(_lsid);
opCtx->setTxnNumber(_txnNumber);
@@ -875,7 +917,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
range,
CleanWhenEnum::kNow);
recipientDeletionTask.setPending(true);
- migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask);
+
+ // It is illegal to wait for write concern with a session checked out, so persist the range
+ // deletion task with an immediately satsifiable write concern and then wait for majority
+ // after yielding the session.
+ migrationutil::persistRangeDeletionTaskLocally(
+ outerOpCtx, recipientDeletionTask, WriteConcernOptions());
+
+ runWithoutSession(outerOpCtx, [&] {
+ WriteConcernResult ignoreResult;
+ auto latestOpTime =
+ repl::ReplClientInfo::forClient(outerOpCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(
+ outerOpCtx, latestOpTime, WriteConcerns::kMajorityWriteConcern, &ignoreResult));
+ });
timing.done(1);
migrateThreadHangAtStep1.pauseWhileSet();
@@ -918,6 +973,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
auto assertNotAborted = [&](OperationContext* opCtx) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
uassert(50748, "Migration aborted while copying documents", getState() != ABORT);
};
@@ -962,18 +1018,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
_clonedBytes += batchClonedBytes;
}
if (_writeConcern.needToWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _migrationId.toBSON());
- } else {
- uassertStatusOK(replStatus.status);
- }
+ runWithoutSession(outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(
+ 22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId.toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
}
sleepmillis(migrateCloneInsertionBatchDelayMS.load());
@@ -1042,6 +1101,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
int i;
for (i = 0; i < maxIterations; i++) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
if (getState() == ABORT) {
LOGV2(22002,
@@ -1050,8 +1110,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return;
}
- if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern))
+ if (runWithoutSession(outerOpCtx, [&] {
+ return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern);
+ })) {
break;
+ }
if (i > 100) {
LOGV2(22003,
@@ -1085,10 +1148,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
"Starting majority commit wait on recipient",
"migrationId"_attr = _migrationId.toBSON());
- auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx, lastOpApplied, _writeConcern);
- uassertStatusOKWithContext(awaitReplicationResult.status,
- awaitReplicationResult.status.codeString());
+ runWithoutSession(outerOpCtx, [&] {
+ auto awaitReplicationResult =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx, lastOpApplied, _writeConcern);
+ uassertStatusOKWithContext(awaitReplicationResult.status,
+ awaitReplicationResult.status.codeString());
+ });
LOGV2(22005,
"Chunk data replicated successfully.",
@@ -1107,6 +1173,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
bool transferAfterCommit = false;
while (getState() == STEADY || getState() == COMMIT_START) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
// Make sure we do at least one transfer after recv'ing the commit message. If we
// aren't sure that at least one transfer happens *after* our state changes to
@@ -1144,7 +1211,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (_flushPendingWrites(opCtx, lastOpApplied)) {
+ if (runWithoutSession(outerOpCtx,
+ [&] { return _flushPendingWrites(opCtx, lastOpApplied); })) {
break;
}
}
@@ -1163,7 +1231,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
migrateThreadHangAtStep5.pauseWhileSet();
}
- _sessionMigration->join();
+ runWithoutSession(outerOpCtx, [&] { _sessionMigration->join(); });
if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
_setStateFail(redact(_sessionMigration->getErrMsg()));
return;
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 4a57fae8c4d..f55c91a9fa3 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -579,10 +579,11 @@ void persistMigrationCoordinatorLocally(OperationContext* opCtx,
}
void persistRangeDeletionTaskLocally(OperationContext* opCtx,
- const RangeDeletionTask& deletionTask) {
+ const RangeDeletionTask& deletionTask,
+ const WriteConcernOptions& writeConcern) {
PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
try {
- store.add(opCtx, deletionTask);
+ store.add(opCtx, deletionTask, writeConcern);
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
// Convert a DuplicateKey error to an anonymous error.
uasserted(31375,
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 5f6eaafc7ae..f5e2b05ee58 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -138,7 +138,8 @@ void persistMigrationCoordinatorLocally(OperationContext* opCtx,
* concern.
*/
void persistRangeDeletionTaskLocally(OperationContext* opCtx,
- const RangeDeletionTask& deletionTask);
+ const RangeDeletionTask& deletionTask,
+ const WriteConcernOptions& writeConcern);
/**
* Updates the migration coordinator document to set the decision field to "committed" and waits for