summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2020-07-08 23:25:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-12 20:58:58 +0000
commit91f3ad01c5fe5599d9ba679a659745fa3b7eb00b (patch)
tree388e008a631c8cd236e12950c7ef676e9a47c555
parentcad3f4c69890907a81872cce2a60949e342a77ce (diff)
downloadmongo-91f3ad01c5fe5599d9ba679a659745fa3b7eb00b.tar.gz
SERVER-48641 SERVER-48689 Yield session in migration destination driver when waiting on replication and session migration
(cherry picked from commit 21b083c7352704fc8c3d8a4f33c54040259ff766)
-rw-r--r--jstests/concurrency/fsm_workload_helpers/chunks.js9
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_base.js3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp3
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp146
-rw-r--r--src/mongo/db/s/migration_util.cpp5
-rw-r--r--src/mongo/db/s/migration_util.h3
7 files changed, 144 insertions, 30 deletions
diff --git a/jstests/concurrency/fsm_workload_helpers/chunks.js b/jstests/concurrency/fsm_workload_helpers/chunks.js
index 3a6d0df64a1..9b49c0ee71a 100644
--- a/jstests/concurrency/fsm_workload_helpers/chunks.js
+++ b/jstests/concurrency/fsm_workload_helpers/chunks.js
@@ -60,7 +60,7 @@ var ChunkHelper = (function() {
return runCommandWithRetries(db, cmd, res => res.code === ErrorCodes.LockBusy);
}
- function moveChunk(db, collName, bounds, toShard, waitForDelete) {
+ function moveChunk(db, collName, bounds, toShard, waitForDelete, secondaryThrottle) {
var cmd = {
moveChunk: db[collName].getFullName(),
bounds: bounds,
@@ -68,6 +68,13 @@ var ChunkHelper = (function() {
_waitForDelete: waitForDelete
};
+ // Using _secondaryThrottle adds coverage for additional waits for write concern on the
+ // recipient during cloning.
+ if (secondaryThrottle) {
+ cmd._secondaryThrottle = true;
+ cmd.writeConcern = {w: "majority"}; // _secondaryThrottle requires a write concern.
+ }
+
const runningWithStepdowns =
TestData.runningWithConfigStepdowns || TestData.runningWithShardStepdowns;
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_base.js b/jstests/concurrency/fsm_workloads/random_moveChunk_base.js
index a35c1ce5c5a..2b0c736cb4e 100644
--- a/jstests/concurrency/fsm_workloads/random_moveChunk_base.js
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_base.js
@@ -93,8 +93,9 @@ var $config = extendWorkload($config, function($config, $super) {
// limited number of retries with exponential backoff.
const bounds = this.calculateChunkBoundsForShardKey(collName, chunk);
const waitForDelete = Random.rand() < 0.5;
+ const secondaryThrottle = Random.rand() < 0.5;
try {
- ChunkHelper.moveChunk(db, collName, bounds, toShard, waitForDelete);
+ ChunkHelper.moveChunk(db, collName, bounds, toShard, waitForDelete, secondaryThrottle);
} catch (e) {
// Failed moveChunks are thrown by the moveChunk helper with the response included as a
// JSON string in the error's message.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 14b7e4d3726..7fffcffaa17 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -91,6 +91,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
@@ -1873,6 +1874,10 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
+ // It is illegal to wait for replication with a session checked out because it can lead to
+ // deadlocks.
+ invariant(OperationContextSession::get(opCtx) == nullptr);
+
Timer timer;
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index b6d45d143dc..a4fe84afb6d 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -123,7 +123,8 @@ void MigrationCoordinator::startMigration(OperationContext* opCtx) {
_waitForDelete ? CleanWhenEnum::kNow
: CleanWhenEnum::kDelayed);
donorDeletionTask.setPending(true);
- migrationutil::persistRangeDeletionTaskLocally(opCtx, donorDeletionTask);
+ migrationutil::persistRangeDeletionTaskLocally(
+ opCtx, donorDeletionTask, WriteConcerns::kMajorityWriteConcern);
}
void MigrationCoordinator::setMigrationDecision(Decision decision) {
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 4a7e117a1d4..ba8796113f7 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -90,6 +90,63 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
-1);
+void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
+ MongoDOperationContextSession::checkOut(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
+ *opCtx->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+}
+
+template <typename Callable>
+constexpr bool returnsVoid() {
+ return std::is_void_v<std::invoke_result_t<Callable>>;
+}
+
+// Yields the checked out session before running the given function, if the migration is using the
+// 4.4 range deleter protocol. If the function runs without throwing, will reacquire the session and
+// verify it is still valid to proceed with the migration.
+template <typename Callable, std::enable_if_t<!returnsVoid<Callable>(), int> = 0>
+auto runWithoutSession(OperationContext* opCtx,
+ Callable&& callable,
+ bool useFCV44RangeDeleterProtocol) {
+ // A session is only checked out in the 4.4 protocol.
+ if (useFCV44RangeDeleterProtocol) {
+ MongoDOperationContextSession::checkIn(opCtx);
+ }
+
+ auto retVal = callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+
+ if (useFCV44RangeDeleterProtocol) {
+ checkOutSessionAndVerifyTxnState(opCtx);
+ }
+
+ return retVal;
+}
+
+// Same as runWithoutSession above but takes a void function.
+template <typename Callable, std::enable_if_t<returnsVoid<Callable>(), int> = 0>
+void runWithoutSession(OperationContext* opCtx,
+ Callable&& callable,
+ bool useFCV44RangeDeleterProtocol) {
+ // A session is only checked out in the 4.4 protocol.
+ if (useFCV44RangeDeleterProtocol) {
+ MongoDOperationContextSession::checkIn(opCtx);
+ }
+
+ callable();
+
+ // The below code can throw, so it cannot run in a scope guard.
+ opCtx->checkForInterrupt();
+
+ if (useFCV44RangeDeleterProtocol) {
+ checkOutSessionAndVerifyTxnState(opCtx);
+ }
+}
+
/**
* Returns a human-readabale name of the migration manager's state.
*/
@@ -788,8 +845,10 @@ void MigrationDestinationManager::_migrateThread() {
// duration of the recipient's side of the migration. This guarantees that if the
// donor shard has failed over, then the new donor primary cannot bump the
// txnNumber on this session while this node is still executing the recipient side
- //(which is important because otherwise, this node may create orphans after the
- // range deletion task on this node has been processed).
+ // (which is important because otherwise, this node may create orphans after the
+ // range deletion task on this node has been processed). The recipient will periodically
+ // yield this session, but will verify the txnNumber has not changed before continuing,
+ // preserving the guarantee that orphans cannot be created after the txnNumber is advanced.
if (_useFCV44RangeDeleterProtocol) {
opCtx->setLogicalSessionId(_lsid);
opCtx->setTxnNumber(_txnNumber);
@@ -905,7 +964,24 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
CleanWhenEnum::kNow);
recipientDeletionTask.setPending(true);
- migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask);
+ // It is illegal to wait for write concern with a session checked out, so persist the
+ // range deletion task with an immediately satsifiable write concern and then wait for
+ // majority after yielding the session.
+ migrationutil::persistRangeDeletionTaskLocally(
+ outerOpCtx, recipientDeletionTask, WriteConcernOptions());
+
+ runWithoutSession(
+ outerOpCtx,
+ [&] {
+ WriteConcernResult ignoreResult;
+ auto latestOpTime =
+ repl::ReplClientInfo::forClient(outerOpCtx->getClient()).getLastOp();
+ uassertStatusOK(waitForWriteConcern(outerOpCtx,
+ latestOpTime,
+ WriteConcerns::kMajorityWriteConcern,
+ &ignoreResult));
+ },
+ _useFCV44RangeDeleterProtocol);
} else {
// Synchronously delete any data which might have been left orphaned in the range
// being moved, and wait for completion
@@ -976,6 +1052,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
auto assertNotAborted = [&](OperationContext* opCtx) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
uassert(50748, "Migration aborted while copying documents", getState() != ABORT);
};
@@ -1020,20 +1097,26 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
_clonedBytes += batchClonedBytes;
}
if (_writeConcern.needToWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- LOGV2_WARNING(22011,
- "secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _useFCV44RangeDeleterProtocol
- ? _migrationId->toBSON()
- : BSONObj());
- } else {
- uassertStatusOK(replStatus.status);
- }
+ runWithoutSession(
+ outerOpCtx,
+ [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(
+ 22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _useFCV44RangeDeleterProtocol
+ ? _migrationId->toBSON()
+ : BSONObj());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ },
+ _useFCV44RangeDeleterProtocol);
}
sleepmillis(migrateCloneInsertionBatchDelayMS.load());
@@ -1102,6 +1185,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
int i;
for (i = 0; i < maxIterations; i++) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
if (getState() == ABORT) {
LOGV2(22002,
@@ -1111,8 +1195,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return;
}
- if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern))
+ if (runWithoutSession(
+ outerOpCtx,
+ [&] { return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern); },
+ _useFCV44RangeDeleterProtocol)) {
break;
+ }
if (i > 100) {
LOGV2(22003,
@@ -1143,10 +1231,16 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
"migrationId"_attr =
_useFCV44RangeDeleterProtocol ? _migrationId->toBSON() : BSONObj());
- auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx, lastOpApplied, _writeConcern);
- uassertStatusOKWithContext(awaitReplicationResult.status,
- awaitReplicationResult.status.codeString());
+ runWithoutSession(outerOpCtx,
+ [&] {
+ auto awaitReplicationResult =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx, lastOpApplied, _writeConcern);
+ uassertStatusOKWithContext(
+ awaitReplicationResult.status,
+ awaitReplicationResult.status.codeString());
+ },
+ _useFCV44RangeDeleterProtocol);
LOGV2(22005,
"Chunk data replicated successfully.",
@@ -1161,6 +1255,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
bool transferAfterCommit = false;
while (getState() == STEADY || getState() == COMMIT_START) {
opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
// Make sure we do at least one transfer after recv'ing the commit message. If we
// aren't sure that at least one transfer happens *after* our state changes to
@@ -1199,7 +1294,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (_flushPendingWrites(opCtx, lastOpApplied)) {
+ if (runWithoutSession(outerOpCtx,
+ [&] { return _flushPendingWrites(opCtx, lastOpApplied); },
+ _useFCV44RangeDeleterProtocol)) {
break;
}
}
@@ -1218,7 +1315,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
migrateThreadHangAtStep5.pauseWhileSet();
}
- _sessionMigration->join();
+ runWithoutSession(
+ outerOpCtx, [&] { _sessionMigration->join(); }, _useFCV44RangeDeleterProtocol);
if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
_setStateFail(redact(_sessionMigration->getErrMsg()));
return;
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 83db88a2702..7d21dd60e34 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -588,10 +588,11 @@ void persistMigrationCoordinatorLocally(OperationContext* opCtx,
}
void persistRangeDeletionTaskLocally(OperationContext* opCtx,
- const RangeDeletionTask& deletionTask) {
+ const RangeDeletionTask& deletionTask,
+ const WriteConcernOptions& writeConcern) {
PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace);
try {
- store.add(opCtx, deletionTask);
+ store.add(opCtx, deletionTask, writeConcern);
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
// Convert a DuplicateKey error to an anonymous error.
uasserted(31375,
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index d5aa58759aa..16e7833b3ac 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -140,7 +140,8 @@ void persistMigrationCoordinatorLocally(OperationContext* opCtx,
* concern.
*/
void persistRangeDeletionTaskLocally(OperationContext* opCtx,
- const RangeDeletionTask& deletionTask);
+ const RangeDeletionTask& deletionTask,
+ const WriteConcernOptions& writeConcern);
/**
* Updates the migration coordinator document to set the decision field to "committed" and waits for