diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 3 |
5 files changed, 102 insertions, 26 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 124e3aa1cd2..20480170b71 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -87,6 +87,7 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" +#include "mongo/db/session_catalog.h" #include "mongo/db/shutdown_in_progress_quiesce_info.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/vector_clock.h" @@ -1941,6 +1942,10 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication( OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { + // It is illegal to wait for replication with a session checked out because it can lead to + // deadlocks. + invariant(OperationContextSession::get(opCtx) == nullptr); + Timer timer; WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index b6d45d143dc..a4fe84afb6d 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -123,7 +123,8 @@ void MigrationCoordinator::startMigration(OperationContext* opCtx) { _waitForDelete ? CleanWhenEnum::kNow : CleanWhenEnum::kDelayed); donorDeletionTask.setPending(true); - migrationutil::persistRangeDeletionTaskLocally(opCtx, donorDeletionTask); + migrationutil::persistRangeDeletionTaskLocally( + opCtx, donorDeletionTask, WriteConcerns::kMajorityWriteConcern); } void MigrationCoordinator::setMigrationDecision(Decision decision) { diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index e18a250adf5..1cfcfb67d4b 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -91,6 +91,46 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, -1); +void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) { + MongoDOperationContextSession::checkOut(opCtx); + TransactionParticipant::get(opCtx).beginOrContinue(opCtx, + *opCtx->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */); +} + +template <typename Callable> +constexpr bool returnsVoid() { + return std::is_void_v<std::invoke_result_t<Callable>>; +} + +// Yields the checked out session before running the given function. If the function runs without +// throwing, will reacquire the session and verify it is still valid to proceed with the migration. +template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0> +auto runWithoutSession(OperationContext* opCtx, Callable&& callable) { + MongoDOperationContextSession::checkIn(opCtx); + + auto retVal = callable(); + + // The below code can throw, so it cannot run in a scope guard. + opCtx->checkForInterrupt(); + checkOutSessionAndVerifyTxnState(opCtx); + + return retVal; +} + +// Same as runWithoutSession above but takes a void function. +template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0> +void runWithoutSession(OperationContext* opCtx, Callable&& callable) { + MongoDOperationContextSession::checkIn(opCtx); + + callable(); + + // The below code can throw, so it cannot run in a scope guard. + opCtx->checkForInterrupt(); + checkOutSessionAndVerifyTxnState(opCtx); +} + /** * Returns a human-readabale name of the migration manager's state. */ @@ -771,8 +811,10 @@ void MigrationDestinationManager::_migrateThread() { // duration of the recipient's side of the migration. This guarantees that if the // donor shard has failed over, then the new donor primary cannot bump the // txnNumber on this session while this node is still executing the recipient side - //(which is important because otherwise, this node may create orphans after the - // range deletion task on this node has been processed). + // (which is important because otherwise, this node may create orphans after the + // range deletion task on this node has been processed). The recipient will periodically + // yield this session, but will verify the txnNumber has not changed before continuing, + // preserving the guarantee that orphans cannot be created after the txnNumber is advanced. opCtx->setLogicalSessionId(_lsid); opCtx->setTxnNumber(_txnNumber); @@ -875,7 +917,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { range, CleanWhenEnum::kNow); recipientDeletionTask.setPending(true); - migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask); + + // It is illegal to wait for write concern with a session checked out, so persist the range + // deletion task with an immediately satsifiable write concern and then wait for majority + // after yielding the session. + migrationutil::persistRangeDeletionTaskLocally( + outerOpCtx, recipientDeletionTask, WriteConcernOptions()); + + runWithoutSession(outerOpCtx, [&] { + WriteConcernResult ignoreResult; + auto latestOpTime = + repl::ReplClientInfo::forClient(outerOpCtx->getClient()).getLastOp(); + uassertStatusOK(waitForWriteConcern( + outerOpCtx, latestOpTime, WriteConcerns::kMajorityWriteConcern, &ignoreResult)); + }); timing.done(1); migrateThreadHangAtStep1.pauseWhileSet(); @@ -918,6 +973,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { auto assertNotAborted = [&](OperationContext* opCtx) { opCtx->checkForInterrupt(); + outerOpCtx->checkForInterrupt(); uassert(50748, "Migration aborted while copying documents", getState() != ABORT); }; @@ -962,18 +1018,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { _clonedBytes += batchClonedBytes; } if (_writeConcern.needToWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - _writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - LOGV2_WARNING(22011, - "secondaryThrottle on, but doc insert timed out; continuing", - "migrationId"_attr = _migrationId.toBSON()); - } else { - uassertStatusOK(replStatus.status); - } + runWithoutSession(outerOpCtx, [&] { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + LOGV2_WARNING( + 22011, + "secondaryThrottle on, but doc insert timed out; continuing", + "migrationId"_attr = _migrationId.toBSON()); + } else { + uassertStatusOK(replStatus.status); + } + }); } sleepmillis(migrateCloneInsertionBatchDelayMS.load()); @@ -1042,6 +1101,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { int i; for (i = 0; i < maxIterations; i++) { opCtx->checkForInterrupt(); + outerOpCtx->checkForInterrupt(); if (getState() == ABORT) { LOGV2(22002, @@ -1050,8 +1110,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { return; } - if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) + if (runWithoutSession(outerOpCtx, [&] { + return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern); + })) { break; + } if (i > 100) { LOGV2(22003, @@ -1085,10 +1148,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { "Starting majority commit wait on recipient", "migrationId"_attr = _migrationId.toBSON()); - auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, lastOpApplied, _writeConcern); - uassertStatusOKWithContext(awaitReplicationResult.status, - awaitReplicationResult.status.codeString()); + runWithoutSession(outerOpCtx, [&] { + auto awaitReplicationResult = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, lastOpApplied, _writeConcern); + uassertStatusOKWithContext(awaitReplicationResult.status, + awaitReplicationResult.status.codeString()); + }); LOGV2(22005, "Chunk data replicated successfully.", @@ -1107,6 +1173,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { bool transferAfterCommit = false; while (getState() == STEADY || getState() == COMMIT_START) { opCtx->checkForInterrupt(); + outerOpCtx->checkForInterrupt(); // Make sure we do at least one transfer after recv'ing the commit message. If we // aren't sure that at least one transfer happens *after* our state changes to @@ -1144,7 +1211,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods if (getState() == COMMIT_START && transferAfterCommit == true) { - if (_flushPendingWrites(opCtx, lastOpApplied)) { + if (runWithoutSession(outerOpCtx, + [&] { return _flushPendingWrites(opCtx, lastOpApplied); })) { break; } } @@ -1163,7 +1231,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { migrateThreadHangAtStep5.pauseWhileSet(); } - _sessionMigration->join(); + runWithoutSession(outerOpCtx, [&] { _sessionMigration->join(); }); if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) { _setStateFail(redact(_sessionMigration->getErrMsg())); return; diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 4a57fae8c4d..f55c91a9fa3 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -579,10 +579,11 @@ void persistMigrationCoordinatorLocally(OperationContext* opCtx, } void persistRangeDeletionTaskLocally(OperationContext* opCtx, - const RangeDeletionTask& deletionTask) { + const RangeDeletionTask& deletionTask, + const WriteConcernOptions& writeConcern) { PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); try { - store.add(opCtx, deletionTask); + store.add(opCtx, deletionTask, writeConcern); } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) { // Convert a DuplicateKey error to an anonymous error. uasserted(31375, diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 5f6eaafc7ae..f5e2b05ee58 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -138,7 +138,8 @@ void persistMigrationCoordinatorLocally(OperationContext* opCtx, * concern. */ void persistRangeDeletionTaskLocally(OperationContext* opCtx, - const RangeDeletionTask& deletionTask); + const RangeDeletionTask& deletionTask, + const WriteConcernOptions& writeConcern); /** * Updates the migration coordinator document to set the decision field to "committed" and waits for |