diff options
6 files changed, 348 insertions, 213 deletions
diff --git a/src/mongo/db/s/resharding/resharding_future_util.h b/src/mongo/db/s/resharding/resharding_future_util.h index 6ca27ed3e78..7ebb8c1b31d 100644 --- a/src/mongo/db/s/resharding/resharding_future_util.h +++ b/src/mongo/db/s/resharding/resharding_future_util.h @@ -31,6 +31,7 @@ #include <vector> +#include "mongo/db/cancelable_operation_context.h" #include "mongo/util/cancellation.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" @@ -167,5 +168,24 @@ private: unique_function<void(const Status&)> _onUnrecoverableError; }; +/** + * Wrapper class around CancelableOperationContextFactory which uses resharding::WithAutomaticRetry + * to ensure all cancelable operations will be retried if able upon failure. + */ +class RetryingCancelableOperationContextFactory { +public: + RetryingCancelableOperationContextFactory(CancellationToken cancelToken, ExecutorPtr executor) + : _factory{std::move(cancelToken), std::move(executor)} {} + + template <typename BodyCallable> + decltype(auto) withAutomaticRetry(BodyCallable&& body) const { + return resharding::WithAutomaticRetry([this, body]() { return body(_factory); }); + } + + +private: + const CancelableOperationContextFactory _factory; +}; + } // namespace resharding } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 8ba5e591e73..be231bce447 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -44,6 +44,7 @@ namespace mongo { namespace { constexpr auto kAnotherOperationInProgress = "Another operation is in progress"; constexpr auto kNoOperationInProgress = "No operation is in progress"; +constexpr auto kMetricsSetBeforeRestore = "Expected metrics to be 0 prior to restore"; constexpr auto kTotalOps = "countReshardingOperations"; constexpr auto kSuccessfulOps = "countReshardingSuccessful"; @@ -564,16 +565,10 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex invariant(checkState(*_currentOp->recipientState, {RecipientStateEnum::kCloning, RecipientStateEnum::kError})); - onDocumentsCopiedForCurrentOp(documents, bytes); - _cumulativeOp->documentsCopied += documents; - _cumulativeOp->bytesCopied += bytes; -} - -void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept { - invariant(_currentOp, kNoOperationInProgress); - _currentOp->documentsCopied += documents; _currentOp->bytesCopied += bytes; + _cumulativeOp->documentsCopied += documents; + _cumulativeOp->bytesCopied += bytes; } void ReshardingMetrics::gotInserts(int n) noexcept { @@ -654,14 +649,8 @@ void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept { *_currentOp->recipientState, {RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError})); - onOplogEntriesFetchedForCurrentOp(entries); - _cumulativeOp->oplogEntriesFetched += entries; -} - -void ReshardingMetrics::onOplogEntriesFetchedForCurrentOp(int64_t entries) noexcept { - invariant(_currentOp, kNoOperationInProgress); - _currentOp->oplogEntriesFetched += entries; + _cumulativeOp->oplogEntriesFetched += entries; } void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept { @@ -672,14 +661,24 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept { invariant(checkState(*_currentOp->recipientState, {RecipientStateEnum::kApplying, RecipientStateEnum::kError})); - onOplogEntriesAppliedForCurrentOp(entries); + _currentOp->oplogEntriesApplied += entries; _cumulativeOp->oplogEntriesApplied += entries; } -void ReshardingMetrics::onOplogEntriesAppliedForCurrentOp(int64_t entries) noexcept { +void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied, + int64_t documentBytesCopied, + int64_t oplogEntriesFetched, + int64_t oplogEntriesApplied) noexcept { invariant(_currentOp, kNoOperationInProgress); - - _currentOp->oplogEntriesApplied += entries; + invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore); + invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore); + invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore); + invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore); + + _currentOp->documentsCopied = documentCountCopied; + _currentOp->bytesCopied = documentBytesCopied; + _currentOp->oplogEntriesFetched = oplogEntriesFetched; + _currentOp->oplogEntriesApplied = oplogEntriesApplied; } void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index 24ab1a9a38a..480dd25ed8a 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -84,8 +84,6 @@ public: void setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept; // Allows updating metrics on "documents to copy" so long as the recipient is in cloning state. void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept; - // Allows updating metrics on "documents to copy". - void onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept; // Allows updating metrics on "opcounters"; void gotInserts(int n) noexcept; @@ -116,9 +114,12 @@ public: // Allows updating "oplog entries to apply" metrics when the recipient is in applying state. void onOplogEntriesFetched(int64_t entries) noexcept; // Allows restoring "oplog entries to apply" metrics. - void onOplogEntriesFetchedForCurrentOp(int64_t entries) noexcept; void onOplogEntriesApplied(int64_t entries) noexcept; - void onOplogEntriesAppliedForCurrentOp(int64_t entries) noexcept; + + void restoreForCurrentOp(int64_t documentCountCopied, + int64_t documentBytesCopied, + int64_t oplogEntriesFetched, + int64_t oplogEntriesApplied) noexcept; // Allows tracking writes during a critical section when the donor's state is either of // "donating-oplog-entries" or "blocking-writes". diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 938fbefb2aa..380fb5c5616 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -74,6 +74,7 @@ MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint); MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientBeforeCloning); MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringCloning); MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringOplogApplication); +MONGO_FAIL_POINT_DEFINE(reshardingOpCtxKilledWhileRestoringMetrics); MONGO_FAIL_POINT_DEFINE(reshardingRecipientFailsAfterTransitionToCloning); namespace { @@ -168,25 +169,28 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_runUntilStrictConsistencyOrErrored( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken) noexcept { - return resharding::WithAutomaticRetry([this, executor, abortToken] { - return ExecutorFuture(**executor) - .then([this, executor, abortToken] { - return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( - executor, abortToken); - }) - .then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); }) - .then([this, executor, abortToken] { - return _cloneThenTransitionToApplying(executor, abortToken); - }) - .then([this, executor, abortToken] { - return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( - executor, abortToken); - }); - }) + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor, abortToken](const auto& factory) { + return ExecutorFuture(**executor) + .then([this, executor, abortToken, &factory] { + return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( + executor, abortToken, factory); + }) + .then([this, &factory] { + _createTemporaryReshardingCollectionThenTransitionToCloning(factory); + }) + .then([this, executor, abortToken, &factory] { + return _cloneThenTransitionToApplying(executor, abortToken, factory); + }) + .then([this, executor, abortToken, &factory] { + return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( + executor, abortToken, factory); + }); + }) .onTransientError([](const Status& status) { LOGV2(5551100, "Recipient _runUntilStrictConsistencyOrErrored encountered transient error", - "error"_attr = status); + "error"_attr = redact(status)); }) .onUnrecoverableError([](const Status& status) {}) .until<Status>([abortToken](const Status& status) { return status.isOK(); }) @@ -200,23 +204,24 @@ ReshardingRecipientService::RecipientStateMachine::_runUntilStrictConsistencyOrE "Resharding operation recipient state machine failed", "namespace"_attr = _metadata.getSourceNss(), "reshardingUUID"_attr = _metadata.getReshardingUUID(), - "error"_attr = status); - - return resharding::WithAutomaticRetry([this, status] { - // It is illegal to transition into kError if the state has already surpassed - // kStrictConsistency. - invariant(_recipientCtx.getState() < RecipientStateEnum::kStrictConsistency); - _transitionToError(status); - - // Intentionally swallow the error - by transitioning to kError, the - // recipient effectively recovers from encountering the error and - // should continue running in the future chain. - }) + "error"_attr = redact(status)); + + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, status](const auto& factory) { + // It is illegal to transition into kError if the state has already surpassed + // kStrictConsistency. + invariant(_recipientCtx.getState() < RecipientStateEnum::kStrictConsistency); + _transitionToError(status, factory); + + // Intentionally swallow the error - by transitioning to kError, the + // recipient effectively recovers from encountering the error and + // should continue running in the future chain. + }) .onTransientError([](const Status& status) { LOGV2(5551104, "Recipient _runUntilStrictConsistencyOrErrored encountered transient " "error while transitioning to state kError", - "error"_attr = status); + "error"_attr = redact(status)); }) .onUnrecoverableError([](const Status& status) {}) .until<Status>([](const Status& retryStatus) { return retryStatus.isOK(); }) @@ -248,15 +253,16 @@ ReshardingRecipientService::RecipientStateMachine::_notifyCoordinatorAndAwaitDec return ExecutorFuture(**executor); } - return resharding::WithAutomaticRetry([this, executor] { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - return _updateCoordinator(opCtx.get(), executor); - }) + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor](const auto& factory) { + auto opCtx = factory.makeOperationContext(&cc()); + return _updateCoordinator(opCtx.get(), executor, factory); + }) .onTransientError([](const Status& status) { LOGV2(5551102, "Transient error while notifying coordinator of recipient state for the " "coordinator's decision", - "error"_attr = status); + "error"_attr = redact(status)); }) .onUnrecoverableError([](const Status& status) {}) .until<Status>([](const Status& status) { return status.isOK(); }) @@ -271,70 +277,70 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_finishR const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& stepdownToken, bool aborted) noexcept { - return resharding::WithAutomaticRetry([this, executor, aborted, stepdownToken] { - return ExecutorFuture<void>(**executor) - .then([this, executor, aborted, stepdownToken] { - if (aborted) { - return future_util::withCancellation( - _dataReplicationQuiesced.thenRunOn(**executor), stepdownToken) - .thenRunOn(**executor) - .onError([](Status status) { - // Wait for all of the data replication components to halt. We - // ignore any errors because resharding is known to have failed - // already. - return Status::OK(); - }); - } else { - _renameTemporaryReshardingCollection(); - return ExecutorFuture<void>(**executor, Status::OK()); - } - }) - .then([this, aborted] { - // It is safe to drop the oplog collections once either (1) the - // collection is renamed or (2) the operation is aborting. - invariant(_recipientCtx.getState() >= - RecipientStateEnum::kStrictConsistency || - aborted); - _cleanupReshardingCollections(aborted); - }) - .then([this, aborted] { - if (_recipientCtx.getState() != RecipientStateEnum::kDone) { - // If a failover occured before removing the recipient document, the - // recipient could already be in state done. - _transitionState(RecipientStateEnum::kDone); - } - - if (!_isAlsoDonor) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - - _externalState->clearFilteringMetadata(opCtx.get()); - - RecoverableCriticalSectionService::get(opCtx.get()) - ->releaseRecoverableCriticalSection( - opCtx.get(), - _metadata.getSourceNss(), - _critSecReason, - ShardingCatalogClient::kLocalWriteConcern); - - _metrics()->leaveCriticalSection(getCurrentTime()); - } - }) - .then([this, executor] { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - return _updateCoordinator(opCtx.get(), executor); - }) - .then([this, aborted] { - { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - removeRecipientDocFailpoint.pauseWhileSet(opCtx.get()); - } - _removeRecipientDocument(aborted); - }); - }) + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor, aborted, stepdownToken](const auto& factory) { + return ExecutorFuture<void>(**executor) + .then([this, executor, aborted, stepdownToken, &factory] { + if (aborted) { + return future_util::withCancellation( + _dataReplicationQuiesced.thenRunOn(**executor), stepdownToken) + .thenRunOn(**executor) + .onError([](Status status) { + // Wait for all of the data replication components to halt. We + // ignore any errors because resharding is known to have failed + // already. + return Status::OK(); + }); + } else { + _renameTemporaryReshardingCollection(factory); + return ExecutorFuture<void>(**executor, Status::OK()); + } + }) + .then([this, aborted, &factory] { + // It is safe to drop the oplog collections once either (1) the + // collection is renamed or (2) the operation is aborting. + invariant(_recipientCtx.getState() >= RecipientStateEnum::kStrictConsistency || + aborted); + _cleanupReshardingCollections(aborted, factory); + }) + .then([this, aborted, &factory] { + if (_recipientCtx.getState() != RecipientStateEnum::kDone) { + // If a failover occured before removing the recipient document, the + // recipient could already be in state done. + _transitionState(RecipientStateEnum::kDone, factory); + } + + if (!_isAlsoDonor) { + auto opCtx = factory.makeOperationContext(&cc()); + + _externalState->clearFilteringMetadata(opCtx.get()); + + RecoverableCriticalSectionService::get(opCtx.get()) + ->releaseRecoverableCriticalSection( + opCtx.get(), + _metadata.getSourceNss(), + _critSecReason, + ShardingCatalogClient::kLocalWriteConcern); + + _metrics()->leaveCriticalSection(getCurrentTime()); + } + }) + .then([this, executor, &factory] { + auto opCtx = factory.makeOperationContext(&cc()); + return _updateCoordinator(opCtx.get(), executor, factory); + }) + .then([this, aborted, &factory] { + { + auto opCtx = factory.makeOperationContext(&cc()); + removeRecipientDocFailpoint.pauseWhileSet(opCtx.get()); + } + _removeRecipientDocument(aborted, factory); + }); + }) .onTransientError([](const Status& status) { LOGV2(5551103, "Transient error while finishing resharding operation", - "error"_attr = status); + "error"_attr = redact(status)); }) .onUnrecoverableError([](const Status& status) {}) .until<Status>([](const Status& status) { return status.isOK(); }) @@ -367,10 +373,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( const CancellationToken& stepdownToken) noexcept { auto abortToken = _initAbortSource(stepdownToken); _markKilledExecutor->startup(); - _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); + _retryingCancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); return ExecutorFuture<void>(**executor) - .then([this] { _startMetrics(); }) + .then([this, executor, abortToken] { return _startMetrics(executor, abortToken); }) .then([this, executor, abortToken] { return _runUntilStrictConsistencyOrErrored(executor, abortToken); }) @@ -378,7 +384,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( return _notifyCoordinatorAndAwaitDecision(executor, abortToken); }) .onCompletion([this, executor, stepdownToken, abortToken](Status status) { - _cancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor); + _retryingCancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor); if (stepdownToken.isCanceled()) { // Propagate any errors from the recipient stepping down. return ExecutorFuture<bool>(**executor, status); @@ -467,7 +473,8 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken) { + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory) { if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { invariant(_cloneTimestamp); return ExecutorFuture(**executor); @@ -475,23 +482,24 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: return future_util::withCancellation(_allDonorsPreparedToDonate.getFuture(), abortToken) .thenRunOn(**executor) - .then([this, executor]( + .then([this, executor, &factory]( ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) { - _transitionToCreatingCollection(cloneDetails, - (*executor)->now() + _minimumOperationDuration); + _transitionToCreatingCollection( + cloneDetails, (*executor)->now() + _minimumOperationDuration, factory); _metrics()->setDocumentsToCopy(cloneDetails.approxDocumentsToCopy, cloneDetails.approxBytesToCopy); }); } void ReshardingRecipientService::RecipientStateMachine:: - _createTemporaryReshardingCollectionThenTransitionToCloning() { + _createTemporaryReshardingCollectionThenTransitionToCloning( + const CancelableOperationContextFactory& factory) { if (_recipientCtx.getState() > RecipientStateEnum::kCreatingCollection) { return; } { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto opCtx = factory.makeOperationContext(&cc()); _externalState->ensureTempReshardingCollectionExistsWithIndexes( opCtx.get(), _metadata, *_cloneTimestamp); @@ -511,7 +519,7 @@ void ReshardingRecipientService::RecipientStateMachine:: }); } - _transitionToCloning(); + _transitionToCloning(factory); } std::unique_ptr<ReshardingDataReplicationInterface> @@ -536,7 +544,8 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationStarted( OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken) { + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory) { const bool cloningDone = _recipientCtx.getState() > RecipientStateEnum::kCloning; if (!_dataReplication) { @@ -548,7 +557,7 @@ void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationSt ->runUntilStrictlyConsistent(**executor, _recipientService->getInstanceCleanupExecutor(), abortToken, - *_cancelableOpCtxFactory, + factory, txnCloneTime.get()) .share(); @@ -564,19 +573,20 @@ void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationSt ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken) { + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory) { if (_recipientCtx.getState() > RecipientStateEnum::kCloning) { return ExecutorFuture(**executor); } { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto opCtx = factory.makeOperationContext(&cc()); reshardingPauseRecipientBeforeCloning.pauseWhileSet(opCtx.get()); } { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - _ensureDataReplicationStarted(opCtx.get(), executor, abortToken); + auto opCtx = factory.makeOperationContext(&cc()); + _ensureDataReplicationStarted(opCtx.get(), executor, abortToken, factory); } reshardingRecipientFailsAfterTransitionToCloning.execute([&](const BSONObj& data) { @@ -585,30 +595,31 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin }); { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto opCtx = factory.makeOperationContext(&cc()); reshardingPauseRecipientDuringCloning.pauseWhileSet(opCtx.get()); } return future_util::withCancellation(_dataReplication->awaitCloningDone(), abortToken) .thenRunOn(**executor) - .then([this] { _transitionToApplying(); }); + .then([this, &factory] { _transitionToApplying(factory); }); } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken) { + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory) { if (_recipientCtx.getState() > RecipientStateEnum::kApplying) { return ExecutorFuture<void>(**executor, Status::OK()); } { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - _ensureDataReplicationStarted(opCtx.get(), executor, abortToken); + auto opCtx = factory.makeOperationContext(&cc()); + _ensureDataReplicationStarted(opCtx.get(), executor, abortToken, factory); } - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - return _updateCoordinator(opCtx.get(), executor) + auto opCtx = factory.makeOperationContext(&cc()); + return _updateCoordinator(opCtx.get(), executor, factory) .then([this, abortToken] { { auto opCtx = cc().makeOperationContext(); @@ -618,8 +629,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: return future_util::withCancellation(_dataReplication->awaitStrictlyConsistent(), abortToken); }) - .then([this] { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + .then([this, &factory] { + auto opCtx = factory.makeOperationContext(&cc()); for (const auto& donor : _donorShards) { auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId()); @@ -629,9 +640,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: !stashColl || stashColl->isEmpty(opCtx.get())); } }) - .then([this] { + .then([this, &factory] { if (!_isAlsoDonor) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto opCtx = factory.makeOperationContext(&cc()); RecoverableCriticalSectionService::get(opCtx.get()) ->acquireRecoverableCriticalSectionBlockWrites( opCtx.get(), @@ -642,13 +653,14 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _metrics()->enterCriticalSection(getCurrentTime()); } - _transitionToStrictConsistency(); - _writeStrictConsistencyOplog(); + _transitionToStrictConsistency(factory); + _writeStrictConsistencyOplog(factory); }); } -void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyOplog() { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); +void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyOplog( + const CancelableOperationContextFactory& factory) { + auto opCtx = factory.makeOperationContext(&cc()); auto rawOpCtx = opCtx.get(); auto generateOplogEntry = [&]() { @@ -683,13 +695,14 @@ void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyO }); } -void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection() { +void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection( + const CancelableOperationContextFactory& factory) { if (_recipientCtx.getState() == RecipientStateEnum::kDone) { return; } if (!_isAlsoDonor) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto opCtx = factory.makeOperationContext(&cc()); RecoverableCriticalSectionService::get(opCtx.get()) ->promoteRecoverableCriticalSectionToBlockAlsoReads( @@ -703,8 +716,8 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi } void ReshardingRecipientService::RecipientStateMachine::_cleanupReshardingCollections( - bool aborted) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + bool aborted, const CancelableOperationContextFactory& factory) { + auto opCtx = factory.makeOperationContext(&cc()); resharding::data_copy::ensureOplogCollectionsDropped( opCtx.get(), _metadata.getReshardingUUID(), _metadata.getSourceUUID(), _donorShards); @@ -715,19 +728,20 @@ void ReshardingRecipientService::RecipientStateMachine::_cleanupReshardingCollec } void ReshardingRecipientService::RecipientStateMachine::_transitionState( - RecipientStateEnum newState) { + RecipientStateEnum newState, const CancelableOperationContextFactory& factory) { invariant(newState != RecipientStateEnum::kCreatingCollection && newState != RecipientStateEnum::kError); auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(newState); - _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); } void ReshardingRecipientService::RecipientStateMachine::_transitionState( RecipientShardContext&& newRecipientCtx, boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& cloneDetails, - boost::optional<mongo::Date_t> configStartTime) { + boost::optional<mongo::Date_t> configStartTime, + const CancelableOperationContextFactory& factory) { invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp); // For logging purposes. @@ -735,7 +749,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( auto newState = newRecipientCtx.getState(); _updateRecipientDocument( - std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime)); + std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory); _metrics()->setRecipientState(newState); @@ -750,42 +764,49 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection( ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails, - const boost::optional<mongo::Date_t> startConfigTxnCloneTime) { + const boost::optional<mongo::Date_t> startConfigTxnCloneTime, + const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection); - _transitionState( - std::move(newRecipientCtx), std::move(cloneDetails), std::move(startConfigTxnCloneTime)); + _transitionState(std::move(newRecipientCtx), + std::move(cloneDetails), + std::move(startConfigTxnCloneTime), + factory); } -void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning() { +void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning( + const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kCloning); - _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); _metrics()->startCopyingDocuments(getCurrentTime()); } -void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying() { +void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying( + const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kApplying); - _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); auto currentTime = getCurrentTime(); _metrics()->endCopyingDocuments(currentTime); _metrics()->startApplyingOplogEntries(currentTime); } -void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency() { +void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency( + const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency); - _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); auto currentTime = getCurrentTime(); _metrics()->endApplyingOplogEntries(currentTime); } -void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) { +void ReshardingRecipientService::RecipientStateMachine::_transitionToError( + Status abortReason, const CancelableOperationContextFactory& factory) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kError); emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason); - _transitionState(std::move(newRecipientCtx), boost::none, boost::none); + _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory); } /** @@ -856,14 +877,16 @@ BSONObj ReshardingRecipientService::RecipientStateMachine::_makeQueryForCoordina } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateCoordinator( - OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + OperationContext* opCtx, + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancelableOperationContextFactory& factory) { repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); return WaitForMajorityService::get(opCtx->getServiceContext()) .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) .thenRunOn(**executor) - .then([this] { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + .then([this, &factory] { + auto opCtx = factory.makeOperationContext(&cc()); auto shardId = _externalState->myShardId(opCtx->getServiceContext()); BSONObjBuilder updateBuilder; @@ -904,8 +927,9 @@ void ReshardingRecipientService::RecipientStateMachine::commit() { void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument( RecipientShardContext&& newRecipientCtx, boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& cloneDetails, - boost::optional<mongo::Date_t> configStartTime) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + boost::optional<mongo::Date_t> configStartTime, + const CancelableOperationContextFactory& factory) { + auto opCtx = factory.makeOperationContext(&cc()); PersistentTaskStore<ReshardingRecipientDocument> store( NamespaceString::kRecipientReshardingOperationsNamespace); @@ -957,8 +981,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument } } -void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument(bool aborted) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); +void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument( + bool aborted, const CancelableOperationContextFactory& factory) { + auto opCtx = factory.makeOperationContext(&cc()); const auto& nss = NamespaceString::kRecipientReshardingOperationsNamespace; writeConflictRetry( @@ -1004,31 +1029,53 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics() return ReshardingMetrics::get(cc().getServiceContext()); } -void ReshardingRecipientService::RecipientStateMachine::_startMetrics() { +ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient); - _restoreMetrics(); - } else { - _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime()); + return _restoreMetricsWithRetry(executor, abortToken); } + _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime()); + return ExecutorFuture<void>(**executor); } -void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() { +ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { _metrics()->setRecipientState(_recipientCtx.getState()); + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry( + [this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); }) + .onTransientError([](const Status& status) { + LOGV2( + 5992700, "Transient error while restoring metrics", "error"_attr = redact(status)); + }) + .onUnrecoverableError([](const Status& status) {}) + .until<Status>([](const Status& status) { return status.isOK(); }) + .on(**executor, abortToken); +} + +void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( + const CancelableOperationContextFactory& factory) { + int64_t documentCountCopied = 0; + int64_t documentBytesCopied = 0; + int64_t oplogEntriesFetched = 0; + int64_t oplogEntriesApplied = 0; - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto opCtx = factory.makeOperationContext(&cc()); { AutoGetCollection tempReshardingColl( opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS); if (tempReshardingColl) { - int64_t bytesCopied = tempReshardingColl->dataSize(opCtx.get()); - int64_t documentsCopied = tempReshardingColl->numRecords(opCtx.get()); - if (bytesCopied > 0) { - _metrics()->onDocumentsCopiedForCurrentOp(documentsCopied, bytesCopied); - } + documentBytesCopied = tempReshardingColl->dataSize(opCtx.get()); + documentCountCopied = tempReshardingColl->numRecords(opCtx.get()); } } + reshardingOpCtxKilledWhileRestoringMetrics.execute( + [&opCtx](const BSONObj& data) { opCtx->markKilled(); }); + for (const auto& donor : _donorShards) { { AutoGetCollection oplogBufferColl( @@ -1036,9 +1083,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() { getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()), MODE_IS); if (oplogBufferColl) { - int64_t recordsFetched = oplogBufferColl->numRecords(opCtx.get()); - if (recordsFetched > 0) - _metrics()->onOplogEntriesFetchedForCurrentOp(recordsFetched); + oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get()); } } @@ -1056,13 +1101,16 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() { result); if (!result.isEmpty()) { - _metrics()->onOplogEntriesAppliedForCurrentOp( + oplogEntriesApplied += result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName) - .Long()); + .Long(); } } } } + + _metrics()->restoreForCurrentOp( + documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied); } CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index e88d50a2330..3e9a56b4646 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -32,6 +32,7 @@ #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/s/resharding/recipient_document_gen.h" #include "mongo/db/s/resharding/resharding_data_replication.h" +#include "mongo/db/s/resharding/resharding_future_util.h" #include "mongo/db/s/resharding_util.h" #include "mongo/s/resharding/type_collection_fields_gen.h" #include "mongo/util/concurrency/thread_pool.h" @@ -176,56 +177,67 @@ private: // The following functions correspond to the actions to take at a particular recipient state. ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); - void _createTemporaryReshardingCollectionThenTransitionToCloning(); + void _createTemporaryReshardingCollectionThenTransitionToCloning( + const CancelableOperationContextFactory& factory); ExecutorFuture<void> _cloneThenTransitionToApplying( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); - void _writeStrictConsistencyOplog(); + void _writeStrictConsistencyOplog(const CancelableOperationContextFactory& factory); - void _renameTemporaryReshardingCollection(); + void _renameTemporaryReshardingCollection(const CancelableOperationContextFactory& factory); - void _cleanupReshardingCollections(bool aborted); + void _cleanupReshardingCollections(bool aborted, + const CancelableOperationContextFactory& factory); // Transitions the on-disk and in-memory state to 'newState'. - void _transitionState(RecipientStateEnum newState); + void _transitionState(RecipientStateEnum newState, + const CancelableOperationContextFactory& factory); void _transitionState(RecipientShardContext&& newRecipientCtx, boost::optional<CloneDetails>&& cloneDetails, - boost::optional<mongo::Date_t> configStartTime); + boost::optional<mongo::Date_t> configStartTime, + const CancelableOperationContextFactory& factory); // The following functions transition the on-disk and in-memory state to the named state. void _transitionToCreatingCollection(CloneDetails cloneDetails, - boost::optional<mongo::Date_t> startConfigTxnCloneTime); + boost::optional<mongo::Date_t> startConfigTxnCloneTime, + const CancelableOperationContextFactory& factory); - void _transitionToCloning(); + void _transitionToCloning(const CancelableOperationContextFactory& factory); - void _transitionToApplying(); + void _transitionToApplying(const CancelableOperationContextFactory& factory); - void _transitionToStrictConsistency(); + void _transitionToStrictConsistency(const CancelableOperationContextFactory& factory); - void _transitionToError(Status abortReason); + void _transitionToError(Status abortReason, const CancelableOperationContextFactory& factory); BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState); ExecutorFuture<void> _updateCoordinator( - OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + OperationContext* opCtx, + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancelableOperationContextFactory& factory); // Updates the mutable portion of the on-disk and in-memory recipient document with // 'newRecipientCtx', 'fetchTimestamp and 'donorShards'. void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx, boost::optional<CloneDetails>&& cloneDetails, - boost::optional<mongo::Date_t> configStartTime); + boost::optional<mongo::Date_t> configStartTime, + const CancelableOperationContextFactory& factory); // Removes the local recipient document from disk. - void _removeRecipientDocument(bool aborted); + void _removeRecipientDocument(bool aborted, const CancelableOperationContextFactory& factory); std::unique_ptr<ReshardingDataReplicationInterface> _makeDataReplication( OperationContext* opCtx, bool cloningDone); @@ -233,14 +245,20 @@ private: void _ensureDataReplicationStarted( OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken); + const CancellationToken& abortToken, + const CancelableOperationContextFactory& factory); ReshardingMetrics* _metrics() const; - void _startMetrics(); + ExecutorFuture<void> _startMetrics( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); // Restore metrics using the persisted metrics after stepping up. - void _restoreMetrics(); + ExecutorFuture<void> _restoreMetricsWithRetry( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); + void _restoreMetrics(const CancelableOperationContextFactory& factory); // Initializes the _abortSource and generates a token from it to return back the caller. // @@ -271,7 +289,8 @@ private: // CancelableOperationContext must have a thread that is always available to it to mark its // opCtx as killed when the cancelToken has been cancelled. const std::shared_ptr<ThreadPool> _markKilledExecutor; - boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory; + boost::optional<resharding::RetryingCancelableOperationContextFactory> + _retryingCancelableOpCtxFactory; const ReshardingDataReplicationFactory _dataReplicationFactory; SharedSemiFuture<void> _dataReplicationQuiesced; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 5d69b790321..b93d43107f7 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -432,6 +432,49 @@ TEST_F(ReshardingRecipientServiceTest, StepDownStepUpEachTransition) { } } +TEST_F(ReshardingRecipientServiceTest, OpCtxKilledWhileRestoringMetrics) { + for (bool isAlsoDonor : {false, true}) { + LOGV2(5992701, + "Running case", + "test"_attr = _agent.getTestName(), + "isAlsoDonor"_attr = isAlsoDonor); + + // Initialize recipient. + auto doc = makeStateDocument(isAlsoDonor); + auto instanceId = + BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID()); + auto opCtx = makeOperationContext(); + if (isAlsoDonor) { + createSourceCollection(opCtx.get(), doc); + } + RecipientStateMachine::insertStateDocument(opCtx.get(), doc); + auto recipient = RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + + // In order to restore metrics, metrics need to exist in the first place, so put the + // recipient in the cloning state, then step down. + PauseDuringStateTransitions stateTransitionsGuard{controller(), + RecipientStateEnum::kCloning}; + notifyToStartCloning(opCtx.get(), *recipient, doc); + stateTransitionsGuard.wait(RecipientStateEnum::kCloning); + stepDown(); + stateTransitionsGuard.unset(RecipientStateEnum::kCloning); + recipient.reset(); + + // Enable failpoint and step up. + auto fp = globalFailPointRegistry().find("reshardingOpCtxKilledWhileRestoringMetrics"); + fp->setMode(FailPoint::nTimes, 1); + stepUp(opCtx.get()); + + // After the failpoint is disabled, the operation should succeed. + auto maybeRecipient = RecipientStateMachine::lookup(opCtx.get(), _service, instanceId); + ASSERT_TRUE(bool(maybeRecipient)); + recipient = *maybeRecipient; + notifyReshardingCommitting(opCtx.get(), *recipient, doc); + ASSERT_OK(recipient->getCompletionFuture().getNoThrow()); + checkStateDocumentRemoved(opCtx.get()); + } +} + DEATH_TEST_REGEX_F(ReshardingRecipientServiceTest, CommitFn, "4457001.*tripwire") { auto doc = makeStateDocument(false /* isAlsoDonor */); auto opCtx = makeOperationContext(); @@ -767,7 +810,6 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { default: break; } - // Step down before the transition to state can complete. stateTransitionsGuard.wait(state); if (state == RecipientStateEnum::kStrictConsistency) { @@ -801,6 +843,12 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { ErrorCodes::InterruptedDueToReplStateChange); prevState = state; + if (state == RecipientStateEnum::kApplying || + state == RecipientStateEnum::kStrictConsistency) { + // If metrics are being verified in the next pass, ensure a retry does not alter values. + auto fp = globalFailPointRegistry().find("reshardingOpCtxKilledWhileRestoringMetrics"); + fp->setMode(FailPoint::nTimes, 1); + } recipient.reset(); if (state != RecipientStateEnum::kDone) |