From c900fa624cda490f8664bff6facfca9f38d243ae Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Tue, 17 May 2022 09:43:29 +0000 Subject: SERVER-65947 MigrationDestinationManager must recover if an error occurs during release of the critical section (cherry picked from commit 5bd946b1fcda93b27ab4c34297454c64c2b9702a) --- etc/backports_required_for_multiversion_tests.yml | 4 + ...gration_retries_on_write_conflict_exceptions.js | 7 +- src/mongo/db/s/migration_destination_manager.cpp | 326 ++++++++++++++------- src/mongo/db/s/migration_destination_manager.h | 30 +- src/mongo/db/s/migration_util.cpp | 12 +- .../db/s/session_catalog_migration_destination.cpp | 30 +- .../db/s/session_catalog_migration_destination.h | 5 +- .../session_catalog_migration_destination_test.cpp | 95 ++++-- 8 files changed, 356 insertions(+), 153 deletions(-) diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 56151a12bde..45dfac12e7a 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -218,6 +218,8 @@ last-continuous: test_file: jstests/core/where_multiple_plans.js - ticket: SERVER-66089 test_file: jstests/replsets/initial_sync_with_partial_transaction.js + - ticket: SERVER-65947 + test_file: jstests/sharding/migration_retries_on_write_conflict_exceptions.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: @@ -571,6 +573,8 @@ last-lts: test_file: jstests/core/where_multiple_plans.js - ticket: SERVER-66089 test_file: jstests/replsets/initial_sync_with_partial_transaction.js + - ticket: SERVER-65947 + test_file: jstests/sharding/migration_retries_on_write_conflict_exceptions.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/sharding/migration_retries_on_write_conflict_exceptions.js b/jstests/sharding/migration_retries_on_write_conflict_exceptions.js index 0dd66c5f660..2c5c4656a0b 100644 --- a/jstests/sharding/migration_retries_on_write_conflict_exceptions.js +++ b/jstests/sharding/migration_retries_on_write_conflict_exceptions.js @@ -59,5 +59,10 @@ preTransferModsFailpoint.off(); awaitResult(); +// After the migration has finished, check that writes are possible on both shards (meaning the +// critical sections have been properly released). +assert.commandWorked(testColl.update({x: 49}, {$set: {c: 1}})); +assert.commandWorked(testColl.update({x: 50}, {$set: {c: 1}})); + st.stop(); -})(); \ No newline at end of file +})(); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index cb9b554ff88..62ace3566b0 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -282,9 +282,9 @@ bool migrationRecipientRecoveryDocumentExists(OperationContext* opCtx, PersistentTaskStore store( NamespaceString::kMigrationRecipientsNamespace); - BSONObjBuilder bob; - sessionId.append(&bob); - return store.count(opCtx, bob.obj()) > 0; + return store.count(opCtx, + BSON(MigrationRecipientRecoveryDocument::kMigrationSessionIdFieldName + << sessionId.toString())) > 0; } // Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread which @@ -303,10 +303,17 @@ MONGO_FAIL_POINT_DEFINE(migrationRecipientFailPostCommitRefresh); } // namespace +const ReplicaSetAwareServiceRegistry::Registerer mdmRegistry( + "MigrationDestinationManager"); + MigrationDestinationManager::MigrationDestinationManager() = default; MigrationDestinationManager::~MigrationDestinationManager() = default; +MigrationDestinationManager* MigrationDestinationManager::get(ServiceContext* serviceContext) { + return &getMigrationDestinationManager(serviceContext); +} + MigrationDestinationManager* MigrationDestinationManager::get(OperationContext* opCtx) { return &getMigrationDestinationManager(opCtx->getServiceContext()); } @@ -463,6 +470,12 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _sessionId = cloneRequest.getSessionId(); _scopedReceiveChunk = std::move(scopedReceiveChunk); + invariant(!_canReleaseCriticalSectionPromise); + _canReleaseCriticalSectionPromise = std::make_unique>(); + + invariant(!_migrateThreadFinishedPromise); + _migrateThreadFinishedPromise = std::make_unique>(); + // TODO: If we are here, the migrate thread must have completed, otherwise _active above // would be false, so this would never block. There is no better place with the current // implementation where to join the thread. @@ -470,11 +483,13 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _migrateThreadHandle.join(); } - _sessionMigration = - std::make_unique(_nss, _fromShard, *_sessionId); + _sessionMigration = std::make_unique( + _nss, _fromShard, *_sessionId, _cancellationSource.token()); ShardingStatistics::get(opCtx).countRecipientMoveChunkStarted.addAndFetch(1); - _migrateThreadHandle = stdx::thread([this]() { _migrateThread(); }); + _migrateThreadHandle = stdx::thread([this, cancellationToken = _cancellationSource.token()]() { + _migrateThread(cancellationToken); + }); return Status::OK(); } @@ -483,7 +498,8 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState( OperationContext* opCtx, ScopedReceiveChunk scopedReceiveChunk, const MigrationRecipientRecoveryDocument& recoveryDoc) { - invariant(!isActive()); + stdx::lock_guard sl(_mutex); + invariant(!_sessionId); _scopedReceiveChunk = std::move(scopedReceiveChunk); _nss = recoveryDoc.getNss(); @@ -496,26 +512,21 @@ Status MigrationDestinationManager::restoreRecoveredMigrationState( _state = kCommitStart; _acquireCSOnRecipient = true; - LOGV2(6064500, "Recovering migration recipient", "sessionId"_attr = *_sessionId); - - RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( - opCtx, _nss, criticalSectionReason(*_sessionId), ShardingCatalogClient::kLocalWriteConcern); + invariant(!_canReleaseCriticalSectionPromise); + _canReleaseCriticalSectionPromise = std::make_unique>(); - LOGV2_DEBUG(6064501, - 2, - "Reacquired migration recipient critical section", - "sessionId"_attr = *_sessionId); + invariant(!_migrateThreadFinishedPromise); + _migrateThreadFinishedPromise = std::make_unique>(); - _state = kEnteredCritSec; + LOGV2(6064500, "Recovering migration recipient", "sessionId"_attr = *_sessionId); if (_migrateThreadHandle.joinable()) { _migrateThreadHandle.join(); } - _migrateThreadHandle = - stdx::thread([this]() { _migrateThread(true /* skipToCritSecTaken */); }); - - LOGV2(6064503, "Recovered migration recipient", "sessionId"_attr = *_sessionId); + _migrateThreadHandle = stdx::thread([this, cancellationToken = _cancellationSource.token()]() { + _migrateThread(cancellationToken, true /* skipToCritSecTaken */); + }); return Status::OK(); } @@ -711,51 +722,52 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio Status MigrationDestinationManager::exitCriticalSection(OperationContext* opCtx, const MigrationSessionId& sessionId) { - stdx::unique_lock lock(_mutex); - if (!_sessionId || !_sessionId->matches(sessionId)) { - LOGV2_DEBUG(5899104, - 2, - "Request to exit recipient critical section does not match current session", - "requested"_attr = sessionId, - "current"_attr = _sessionId); - - if (migrationRecipientRecoveryDocumentExists(opCtx, sessionId)) { - // This node may have stepped down and interrupted the migrateThread, which reset - // _sessionId. But the critical section may not have been released so it will be - // recovered by the new primary. - return {ErrorCodes::CommandFailed, - "Recipient migration recovery document still exists"}; - } + SharedSemiFuture threadFinishedFuture; + { + stdx::unique_lock lock(_mutex); + if (!_sessionId || !_sessionId->matches(sessionId)) { + LOGV2_DEBUG(5899104, + 2, + "Request to exit recipient critical section does not match current session", + "requested"_attr = sessionId, + "current"_attr = _sessionId); + + if (migrationRecipientRecoveryDocumentExists(opCtx, sessionId)) { + // This node may have stepped down and interrupted the migrateThread, which reset + // _sessionId. But the critical section may not have been released so it will be + // recovered by the new primary. + return {ErrorCodes::CommandFailed, + "Recipient migration recovery document still exists"}; + } - // Ensure the command's wait for writeConcern will until the recovery document is deleted. - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + // Ensure the command's wait for writeConcern will until the recovery document is + // deleted. + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - return Status::OK(); - } + return Status::OK(); + } - if (_state < kEnteredCritSec) { - return {ErrorCodes::CommandFailed, "recipient critical section has not yet been entered"}; - } + if (_state < kEnteredCritSec) { + return {ErrorCodes::CommandFailed, + "recipient critical section has not yet been entered"}; + } - // If the thread is waiting to be signaled to release the CS, signal it by transitioning - // _state to EXIT_CRIT_SEC - if (_state == kEnteredCritSec) { - _state = kExitCritSec; - _stateChangedCV.notify_all(); + // Fulfill the promise to let the migrateThread release the critical section. + invariant(_canReleaseCriticalSectionPromise); + if (!_canReleaseCriticalSectionPromise->getFuture().isReady()) { + _canReleaseCriticalSectionPromise->emplaceValue(); + } + + threadFinishedFuture = _migrateThreadFinishedPromise->getFuture(); } - // Wait for the thread to finish - opCtx->waitForConditionOrInterrupt(_isActiveCV, lock, [&]() { return !_sessionId; }); + // Wait for the migrateThread to finish + const auto threadFinishState = threadFinishedFuture.get(opCtx); - if (_state != kDone) { + if (threadFinishState != kDone) { return {ErrorCodes::CommandFailed, "exitCriticalSection failed"}; } - dassert( - !migrationRecipientRecoveryDocumentExists(opCtx, sessionId), - str::stream() << "Recipient migration recovery document should not exist exist. SessionId: " - << sessionId.toString()); - LOGV2_DEBUG( 5899105, 2, "Succeeded releasing recipient critical section", "requested"_attr = sessionId); @@ -1041,7 +1053,10 @@ void MigrationDestinationManager::cloneCollectionIndexesAndOptions( } } -void MigrationDestinationManager::_migrateThread(bool skipToCritSecTaken) { +void MigrationDestinationManager::_migrateThread(CancellationToken cancellationToken, + bool skipToCritSecTaken) { + invariant(_sessionId); + Client::initThread("migrateThread"); auto client = Client::getCurrent(); { @@ -1049,41 +1064,77 @@ void MigrationDestinationManager::_migrateThread(bool skipToCritSecTaken) { client->setSystemOperationKillableByStepdown(lk); } - auto uniqueOpCtx = client->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); + bool recovering = false; + while (true) { + const auto executor = + Grid::get(client->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto uniqueOpCtx = + CancelableOperationContext(client->makeOperationContext(), cancellationToken, executor); + auto opCtx = uniqueOpCtx.get(); - if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) { - AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx); - } + if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) { + AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx); + } - try { - // The outer OperationContext is used to hold the session checked out for the - // duration of the recipient's side of the migration. This guarantees that if the - // donor shard has failed over, then the new donor primary cannot bump the - // txnNumber on this session while this node is still executing the recipient side - // (which is important because otherwise, this node may create orphans after the - // range deletion task on this node has been processed). The recipient will periodically - // yield this session, but will verify the txnNumber has not changed before continuing, - // preserving the guarantee that orphans cannot be created after the txnNumber is advanced. - opCtx->setLogicalSessionId(_lsid); - opCtx->setTxnNumber(_txnNumber); - - MongoDOperationContextSession sessionTxnState(opCtx); - - auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - {*opCtx->getTxnNumber()}, - boost::none /* autocommit */, - boost::none /* startTransaction */); - _migrateDriver(opCtx, skipToCritSecTaken); - } catch (...) { - _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); + try { + if (recovering) { + if (!migrationRecipientRecoveryDocumentExists(opCtx, *_sessionId)) { + // No need to run any recovery. + break; + } + } + + // The outer OperationContext is used to hold the session checked out for the + // duration of the recipient's side of the migration. This guarantees that if the + // donor shard has failed over, then the new donor primary cannot bump the + // txnNumber on this session while this node is still executing the recipient side + // (which is important because otherwise, this node may create orphans after the + // range deletion task on this node has been processed). The recipient will periodically + // yield this session, but will verify the txnNumber has not changed before continuing, + // preserving the guarantee that orphans cannot be created after the txnNumber is + // advanced. + opCtx->setLogicalSessionId(_lsid); + opCtx->setTxnNumber(_txnNumber); + + MongoDOperationContextSession sessionTxnState(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue(opCtx, + {*opCtx->getTxnNumber()}, + boost::none /* autocommit */, + boost::none /* startTransaction */); + _migrateDriver(opCtx, skipToCritSecTaken || recovering); + } catch (...) { + _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); + + if (!cancellationToken.isCanceled() && _acquireCSOnRecipient) { + // Run recovery if needed. + recovering = true; + continue; + } + } + + break; } stdx::lock_guard lk(_mutex); _sessionId.reset(); _scopedReceiveChunk.reset(); _isActiveCV.notify_all(); + + // If we reached this point without having set _canReleaseCriticalSectionPromise we must be on + // an error path. Just set the promise with error because it is illegal to leave it unset on + // destruction. + invariant(_canReleaseCriticalSectionPromise); + if (!_canReleaseCriticalSectionPromise->getFuture().isReady()) { + _canReleaseCriticalSectionPromise->setError( + {ErrorCodes::CallbackCanceled, "explicitly breaking release critical section promise"}); + } + _canReleaseCriticalSectionPromise.reset(); + + invariant(_migrateThreadFinishedPromise); + _migrateThreadFinishedPromise->emplaceValue(_state); + _migrateThreadFinishedPromise.reset(); } void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, @@ -1592,10 +1643,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, "Persisted migration recipient recovery document", "sessionId"_attr = _sessionId); - // Enter critical section + // Enter critical section. Ensure it has been majority commited before + // _recvChunkCommit returns success to the donor, so that if the recipient steps + // down, the critical section is kept taken while the donor commits the migration. RecoverableCriticalSectionService::get(opCtx) ->acquireRecoverableCriticalSectionBlockWrites( - opCtx, _nss, critSecReason, ShardingCatalogClient::kLocalWriteConcern); + opCtx, _nss, critSecReason, ShardingCatalogClient::kMajorityWriteConcern); LOGV2(5899114, "Entered migration recipient critical section", logAttrs(_nss)); timeInCriticalSection.emplace(); @@ -1614,6 +1667,41 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, } } } + } else { + // We can only ever be in this path if the recipient critical section feature is enabled. + invariant(_acquireCSOnRecipient); + + outerOpCtx->setAlwaysInterruptAtStepDownOrUp(); + auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator"); + { + stdx::lock_guard lk(*newClient.get()); + newClient->setSystemOperationKillableByStepdown(lk); + } + AlternativeClientRegion acr(newClient); + auto executor = + Grid::get(outerOpCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), outerOpCtx->getCancellationToken(), executor); + auto opCtx = newOpCtxPtr.get(); + + RecoverableCriticalSectionService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, + _nss, + criticalSectionReason(*_sessionId), + ShardingCatalogClient::kMajorityWriteConcern); + + LOGV2_DEBUG(6064501, + 2, + "Reacquired migration recipient critical section", + "sessionId"_attr = *_sessionId); + + { + stdx::lock_guard sl(_mutex); + _state = kEnteredCritSec; + _stateChangedCV.notify_all(); + } + + LOGV2(6064503, "Recovered migration recipient", "sessionId"_attr = *_sessionId); } if (_acquireCSOnRecipient) { @@ -1795,35 +1883,33 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi OperationContext* opCtx, const Timer& timeInCriticalSection) { // Wait until the migrate thread is signaled to release the critical section LOGV2_DEBUG(5899111, 3, "Waiting for release critical section signal"); - { - stdx::unique_lock lock(_mutex); - opCtx->waitForConditionOrInterrupt( - _stateChangedCV, lock, [&]() { return _state != kEnteredCritSec; }); - } + invariant(_canReleaseCriticalSectionPromise); + _canReleaseCriticalSectionPromise->getFuture().get(opCtx); - invariant(_state == kExitCritSec || _state == kFail || _state == kAbort); + _setState(kExitCritSec); // Refresh the filtering metadata - if (_state == kExitCritSec) { - LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section"); + LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section"); - try { - if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) { - uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint"); - } + bool refreshFailed = false; + try { + if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) { + uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint"); + } - forceShardFilteringMetadataRefresh(opCtx, _nss); - } catch (const DBException& ex) { - LOGV2_DEBUG(5899103, - 2, - "Post-migration commit refresh failed on recipient", - "migrationId"_attr = _migrationId, - "error"_attr = redact(ex)); + forceShardFilteringMetadataRefresh(opCtx, _nss); + } catch (const DBException& ex) { + LOGV2_DEBUG(5899103, + 2, + "Post-migration commit refresh failed on recipient", + "migrationId"_attr = _migrationId, + "error"_attr = redact(ex)); + refreshFailed = true; + } - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, _nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); - } + if (refreshFailed) { + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); } // Release the critical section @@ -1853,4 +1939,28 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi migrationutil::deleteMigrationRecipientRecoveryDocument(opCtx, *_migrationId); } +void MigrationDestinationManager::onStepUpBegin(OperationContext* opCtx, long long term) { + stdx::lock_guard sl(_mutex); + auto newCancellationSource = CancellationSource(); + std::swap(_cancellationSource, newCancellationSource); +} + +void MigrationDestinationManager::onStepDown() { + boost::optional> migrateThreadFinishedFuture; + { + stdx::lock_guard sl(_mutex); + // Cancel any migrateThread work. + _cancellationSource.cancel(); + + if (_migrateThreadFinishedPromise) { + migrateThreadFinishedFuture = _migrateThreadFinishedPromise->getFuture(); + } + } + + // Wait for the migrateThread to finish. + if (migrateThreadFinishedFuture) { + migrateThreadFinishedFuture->wait(); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index c1421fc51ea..925296091cb 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -37,6 +37,7 @@ #include "mongo/bson/oid.h" #include "mongo/client/connection_string.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/migration_recipient_recovery_document_gen.h" #include "mongo/db/s/migration_session_id.h" @@ -46,6 +47,7 @@ #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" +#include "mongo/util/cancellation.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/timer.h" @@ -70,7 +72,8 @@ struct CollectionOptionsAndIndexes { /** * Drives the receiving side of the MongoD migration process. One instance exists per shard. */ -class MigrationDestinationManager { +class MigrationDestinationManager + : public ReplicaSetAwareServiceShardSvr { MigrationDestinationManager(const MigrationDestinationManager&) = delete; MigrationDestinationManager& operator=(const MigrationDestinationManager&) = delete; @@ -94,6 +97,7 @@ public: /** * Returns the singleton instance of the migration destination manager. */ + static MigrationDestinationManager* get(ServiceContext* serviceContext); static MigrationDestinationManager* get(OperationContext* opCtx); State getState() const; @@ -211,7 +215,7 @@ private: /** * Thread which drives the migration apply process on the recipient side. */ - void _migrateThread(bool skipToCritSecTaken = false); + void _migrateThread(CancellationToken cancellationToken, bool skipToCritSecTaken = false); void _migrateDriver(OperationContext* opCtx, bool skipToCritSecTaken = false); @@ -255,6 +259,17 @@ private: void awaitCriticalSectionReleaseSignalAndCompleteMigration(OperationContext* opCtx, const Timer& timeInCriticalSection); + /** + * ReplicaSetAwareService entry points. + */ + void onStartup(OperationContext* opCtx) final {} + void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) final {} + void onShutdown() final {} + void onStepUpBegin(OperationContext* opCtx, long long term) final; + void onStepUpComplete(OperationContext* opCtx, long long term) final {} + void onStepDown() final; + void onBecomeArbiter() final {} + // Mutex to guard all fields mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationDestinationManager::_mutex"); @@ -303,6 +318,17 @@ private: stdx::condition_variable _stateChangedCV; bool _acquireCSOnRecipient{false}; + + // Promise that will be fulfilled when the donor has signaled us that we can release the + // critical section. + std::unique_ptr> _canReleaseCriticalSectionPromise; + + // Promise that will be fulfilled when the migrateThread has finished its work. + std::unique_ptr> _migrateThreadFinishedPromise; + + // Cancellation source that is cancelled on stepdowns. On stepup, a new cancellation source will + // be installed. + CancellationSource _cancellationSource; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index e4cd3ce5683..ec63e7477af 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -198,7 +198,8 @@ void sendWriteCommandToRecipient(OperationContext* opCtx, void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( OperationContext* opCtx, StringData taskDescription, - std::function doWork) { + std::function doWork, + boost::optional backoff = boost::none) { const std::string newClientName = "{}-{}"_format(getThreadName(), taskDescription); const auto initialTerm = repl::ReplicationCoordinator::get(opCtx)->getTerm(); @@ -236,6 +237,10 @@ void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( doWork(newOpCtx.get()); break; } catch (DBException& ex) { + if (backoff) { + sleepFor(backoff->nextSleep()); + } + if (attempt % kLogRetryAttemptThreshold == 1) { LOGV2_WARNING(23937, "Retrying task after failed attempt", @@ -1232,8 +1237,9 @@ ExecutorFuture launchReleaseCriticalSectionOnRecipientFuture( "shardId"_attr = recipientShardId, "sessionId"_attr = sessionId, "error"_attr = exShardNotFound); - }; - }); + } + }, + Backoff(Seconds(1), Milliseconds::max())); }); } diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 5505acde25e..f00a2215460 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -37,6 +37,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/connection_string.h" +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/logical_session_id.h" @@ -199,7 +200,9 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, * information. The new oplogEntry will also link to prePostImageTs if not null. */ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, - const ProcessOplogResult& lastResult) { + const ProcessOplogResult& lastResult, + ServiceContext* serviceContext, + CancellationToken cancellationToken) { auto oplogEntry = parseOplog(oplogBSON); ProcessOplogResult result; @@ -245,7 +248,9 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, const auto stmtIds = oplogEntry.getStatementIds(); - auto uniqueOpCtx = cc().makeOperationContext(); + auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); + auto uniqueOpCtx = + CancelableOperationContext(cc().makeOperationContext(), cancellationToken, executor); auto opCtx = uniqueOpCtx.get(); opCtx->setLogicalSessionId(result.sessionId); opCtx->setTxnNumber(result.txnNum); @@ -342,10 +347,14 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, } // namespace SessionCatalogMigrationDestination::SessionCatalogMigrationDestination( - NamespaceString nss, ShardId fromShard, MigrationSessionId migrationSessionId) + NamespaceString nss, + ShardId fromShard, + MigrationSessionId migrationSessionId, + CancellationToken cancellationToken) : _nss(std::move(nss)), _fromShard(std::move(fromShard)), - _migrationSessionId(std::move(migrationSessionId)) {} + _migrationSessionId(std::move(migrationSessionId)), + _cancellationToken(std::move(cancellationToken)) {} SessionCatalogMigrationDestination::~SessionCatalogMigrationDestination() { if (_thread.joinable()) { @@ -427,7 +436,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service BSONObj nextBatch; BSONArray oplogArray; { - auto uniqueCtx = cc().makeOperationContext(); + auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor(); + auto uniqueCtx = CancelableOperationContext( + cc().makeOperationContext(), _cancellationToken, executor); auto opCtx = uniqueCtx.get(); nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); @@ -482,7 +493,8 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } for (BSONArrayIteratorSorted oplogIter(oplogArray); oplogIter.more();) { try { - lastResult = processSessionOplog(oplogIter.next().Obj(), lastResult); + lastResult = processSessionOplog( + oplogIter.next().Obj(), lastResult, service, _cancellationToken); } catch (const ExceptionFor&) { // This means that the server has a newer txnNumber than the oplog being // migrated, so just skip it @@ -492,7 +504,11 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } WriteConcernResult unusedWCResult; - auto uniqueOpCtx = cc().makeOperationContext(); + + auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor(); + auto uniqueOpCtx = + CancelableOperationContext(cc().makeOperationContext(), _cancellationToken, executor); + uassertStatusOK( waitForWriteConcern(uniqueOpCtx.get(), lastResult.oplogTime, kMajorityWC, &unusedWCResult)); diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h index 890e3eec033..0dededc6699 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.h +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -40,6 +40,7 @@ #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" +#include "mongo/util/cancellation.h" #include "mongo/util/concurrency/with_lock.h" namespace mongo { @@ -69,7 +70,8 @@ public: SessionCatalogMigrationDestination(NamespaceString nss, ShardId fromShard, - MigrationSessionId migrationSessionId); + MigrationSessionId migrationSessionId, + CancellationToken cancellationToken); ~SessionCatalogMigrationDestination(); /** @@ -113,6 +115,7 @@ private: const NamespaceString _nss; const ShardId _fromShard; const MigrationSessionId _migrationSessionId; + const CancellationToken _cancellationToken; stdx::thread _thread; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 0b634db0ff6..d1e5a11848c 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -281,6 +281,8 @@ public: ->setFilteringMetadata(operationContext(), CollectionMetadata()); } + CancellationSource _cancellationSource; + private: std::unique_ptr makeShardingCatalogClient() override { class StaticCatalogClient final : public ShardingCatalogClientMock { @@ -328,7 +330,8 @@ private: }; TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTransfer) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); finishSessionExpectSuccess(&sessionMigration); @@ -337,7 +340,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTr TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -397,7 +401,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithMultiStmtIds) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -460,7 +465,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithMultiStmtIds) { TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -515,7 +521,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -579,7 +586,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) const auto sessionId1 = makeLogicalSessionIdForTest(); const auto sessionId2 = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo1; @@ -662,7 +670,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -727,7 +736,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -819,7 +829,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPreImageFindAndModify) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -913,7 +924,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPreImag TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -1004,7 +1016,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPostImageFindAndModify) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -1096,7 +1109,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleForgedPostIma TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -1202,7 +1216,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { << "newerSess"), 0); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo oldSessionInfo; @@ -1253,7 +1268,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt auto opCtx = operationContext(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo oldSessionInfo; @@ -1310,7 +1326,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1325,7 +1342,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkErr } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoOplog) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1340,7 +1358,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithBadOplogFormat) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1359,7 +1378,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoSessionId) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1384,7 +1404,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoTxnNumber) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1408,7 +1429,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWithNoStmtId) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1438,7 +1460,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, const auto sessionId = makeLogicalSessionIdForTest(); auto opCtx = operationContext(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); OperationSessionInfo sessionInfo; @@ -1503,7 +1526,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -1541,7 +1565,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImage TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNonMatchingSessionId) { - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -1582,7 +1607,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNonMatchingTxnNum) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -1624,7 +1650,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorIfPreImageOplogFollowWithOplogWithNoPreImageLink) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -1663,7 +1690,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorIfOplogWithPreImageLinkIsPrecededByNormalOplog) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -1704,7 +1732,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorIfOplogWithPostImageLinkIsPrecededByNormalOplog) { const auto sessionId = makeLogicalSessionIdForTest(); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); @@ -1751,7 +1780,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30); - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); auto oplog1 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime @@ -1834,7 +1864,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory Date_t::now(), // wall clock time {5})}; // statement ids - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -1914,7 +1945,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, Date_t::now(), // wall clock time {55})}; // statement ids - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); @@ -2008,7 +2040,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, MigratingKnownStmtWhileOplogTrunc Date_t::now(), // wall clock time {kStmtId}); // statement ids - SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId()); + SessionCatalogMigrationDestination sessionMigration( + kNs, kFromShard, migrationId(), _cancellationSource.token()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); -- cgit v1.2.1