summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/resharding/resharding_future_util.h20
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp37
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp384
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h61
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp50
6 files changed, 348 insertions, 213 deletions
diff --git a/src/mongo/db/s/resharding/resharding_future_util.h b/src/mongo/db/s/resharding/resharding_future_util.h
index 6ca27ed3e78..7ebb8c1b31d 100644
--- a/src/mongo/db/s/resharding/resharding_future_util.h
+++ b/src/mongo/db/s/resharding/resharding_future_util.h
@@ -31,6 +31,7 @@
#include <vector>
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/util/cancellation.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
@@ -167,5 +168,24 @@ private:
unique_function<void(const Status&)> _onUnrecoverableError;
};
+/**
+ * Wrapper class around CancelableOperationContextFactory which uses resharding::WithAutomaticRetry
+ * to ensure all cancelable operations will be retried if able upon failure.
+ */
+class RetryingCancelableOperationContextFactory {
+public:
+ RetryingCancelableOperationContextFactory(CancellationToken cancelToken, ExecutorPtr executor)
+ : _factory{std::move(cancelToken), std::move(executor)} {}
+
+ template <typename BodyCallable>
+ decltype(auto) withAutomaticRetry(BodyCallable&& body) const {
+ return resharding::WithAutomaticRetry([this, body]() { return body(_factory); });
+ }
+
+
+private:
+ const CancelableOperationContextFactory _factory;
+};
+
} // namespace resharding
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 8ba5e591e73..be231bce447 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -44,6 +44,7 @@ namespace mongo {
namespace {
constexpr auto kAnotherOperationInProgress = "Another operation is in progress";
constexpr auto kNoOperationInProgress = "No operation is in progress";
+constexpr auto kMetricsSetBeforeRestore = "Expected metrics to be 0 prior to restore";
constexpr auto kTotalOps = "countReshardingOperations";
constexpr auto kSuccessfulOps = "countReshardingSuccessful";
@@ -564,16 +565,10 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex
invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kError}));
- onDocumentsCopiedForCurrentOp(documents, bytes);
- _cumulativeOp->documentsCopied += documents;
- _cumulativeOp->bytesCopied += bytes;
-}
-
-void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
-
_currentOp->documentsCopied += documents;
_currentOp->bytesCopied += bytes;
+ _cumulativeOp->documentsCopied += documents;
+ _cumulativeOp->bytesCopied += bytes;
}
void ReshardingMetrics::gotInserts(int n) noexcept {
@@ -654,14 +649,8 @@ void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
*_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
- onOplogEntriesFetchedForCurrentOp(entries);
- _cumulativeOp->oplogEntriesFetched += entries;
-}
-
-void ReshardingMetrics::onOplogEntriesFetchedForCurrentOp(int64_t entries) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
-
_currentOp->oplogEntriesFetched += entries;
+ _cumulativeOp->oplogEntriesFetched += entries;
}
void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
@@ -672,14 +661,24 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
- onOplogEntriesAppliedForCurrentOp(entries);
+ _currentOp->oplogEntriesApplied += entries;
_cumulativeOp->oplogEntriesApplied += entries;
}
-void ReshardingMetrics::onOplogEntriesAppliedForCurrentOp(int64_t entries) noexcept {
+void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied,
+ int64_t documentBytesCopied,
+ int64_t oplogEntriesFetched,
+ int64_t oplogEntriesApplied) noexcept {
invariant(_currentOp, kNoOperationInProgress);
-
- _currentOp->oplogEntriesApplied += entries;
+ invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
+
+ _currentOp->documentsCopied = documentCountCopied;
+ _currentOp->bytesCopied = documentBytesCopied;
+ _currentOp->oplogEntriesFetched = oplogEntriesFetched;
+ _currentOp->oplogEntriesApplied = oplogEntriesApplied;
}
void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index 24ab1a9a38a..480dd25ed8a 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -84,8 +84,6 @@ public:
void setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept;
// Allows updating metrics on "documents to copy" so long as the recipient is in cloning state.
void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept;
- // Allows updating metrics on "documents to copy".
- void onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept;
// Allows updating metrics on "opcounters";
void gotInserts(int n) noexcept;
@@ -116,9 +114,12 @@ public:
// Allows updating "oplog entries to apply" metrics when the recipient is in applying state.
void onOplogEntriesFetched(int64_t entries) noexcept;
// Allows restoring "oplog entries to apply" metrics.
- void onOplogEntriesFetchedForCurrentOp(int64_t entries) noexcept;
void onOplogEntriesApplied(int64_t entries) noexcept;
- void onOplogEntriesAppliedForCurrentOp(int64_t entries) noexcept;
+
+ void restoreForCurrentOp(int64_t documentCountCopied,
+ int64_t documentBytesCopied,
+ int64_t oplogEntriesFetched,
+ int64_t oplogEntriesApplied) noexcept;
// Allows tracking writes during a critical section when the donor's state is either of
// "donating-oplog-entries" or "blocking-writes".
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 938fbefb2aa..380fb5c5616 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -74,6 +74,7 @@ MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint);
MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientBeforeCloning);
MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringCloning);
MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringOplogApplication);
+MONGO_FAIL_POINT_DEFINE(reshardingOpCtxKilledWhileRestoringMetrics);
MONGO_FAIL_POINT_DEFINE(reshardingRecipientFailsAfterTransitionToCloning);
namespace {
@@ -168,25 +169,28 @@ ExecutorFuture<void>
ReshardingRecipientService::RecipientStateMachine::_runUntilStrictConsistencyOrErrored(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) noexcept {
- return resharding::WithAutomaticRetry([this, executor, abortToken] {
- return ExecutorFuture(**executor)
- .then([this, executor, abortToken] {
- return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
- executor, abortToken);
- })
- .then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); })
- .then([this, executor, abortToken] {
- return _cloneThenTransitionToApplying(executor, abortToken);
- })
- .then([this, executor, abortToken] {
- return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
- executor, abortToken);
- });
- })
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor, abortToken](const auto& factory) {
+ return ExecutorFuture(**executor)
+ .then([this, executor, abortToken, &factory] {
+ return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
+ executor, abortToken, factory);
+ })
+ .then([this, &factory] {
+ _createTemporaryReshardingCollectionThenTransitionToCloning(factory);
+ })
+ .then([this, executor, abortToken, &factory] {
+ return _cloneThenTransitionToApplying(executor, abortToken, factory);
+ })
+ .then([this, executor, abortToken, &factory] {
+ return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
+ executor, abortToken, factory);
+ });
+ })
.onTransientError([](const Status& status) {
LOGV2(5551100,
"Recipient _runUntilStrictConsistencyOrErrored encountered transient error",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([abortToken](const Status& status) { return status.isOK(); })
@@ -200,23 +204,24 @@ ReshardingRecipientService::RecipientStateMachine::_runUntilStrictConsistencyOrE
"Resharding operation recipient state machine failed",
"namespace"_attr = _metadata.getSourceNss(),
"reshardingUUID"_attr = _metadata.getReshardingUUID(),
- "error"_attr = status);
-
- return resharding::WithAutomaticRetry([this, status] {
- // It is illegal to transition into kError if the state has already surpassed
- // kStrictConsistency.
- invariant(_recipientCtx.getState() < RecipientStateEnum::kStrictConsistency);
- _transitionToError(status);
-
- // Intentionally swallow the error - by transitioning to kError, the
- // recipient effectively recovers from encountering the error and
- // should continue running in the future chain.
- })
+ "error"_attr = redact(status));
+
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, status](const auto& factory) {
+ // It is illegal to transition into kError if the state has already surpassed
+ // kStrictConsistency.
+ invariant(_recipientCtx.getState() < RecipientStateEnum::kStrictConsistency);
+ _transitionToError(status, factory);
+
+ // Intentionally swallow the error - by transitioning to kError, the
+ // recipient effectively recovers from encountering the error and
+ // should continue running in the future chain.
+ })
.onTransientError([](const Status& status) {
LOGV2(5551104,
"Recipient _runUntilStrictConsistencyOrErrored encountered transient "
"error while transitioning to state kError",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& retryStatus) { return retryStatus.isOK(); })
@@ -248,15 +253,16 @@ ReshardingRecipientService::RecipientStateMachine::_notifyCoordinatorAndAwaitDec
return ExecutorFuture(**executor);
}
- return resharding::WithAutomaticRetry([this, executor] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return _updateCoordinator(opCtx.get(), executor);
- })
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor](const auto& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
+ return _updateCoordinator(opCtx.get(), executor, factory);
+ })
.onTransientError([](const Status& status) {
LOGV2(5551102,
"Transient error while notifying coordinator of recipient state for the "
"coordinator's decision",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& status) { return status.isOK(); })
@@ -271,70 +277,70 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_finishR
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& stepdownToken,
bool aborted) noexcept {
- return resharding::WithAutomaticRetry([this, executor, aborted, stepdownToken] {
- return ExecutorFuture<void>(**executor)
- .then([this, executor, aborted, stepdownToken] {
- if (aborted) {
- return future_util::withCancellation(
- _dataReplicationQuiesced.thenRunOn(**executor), stepdownToken)
- .thenRunOn(**executor)
- .onError([](Status status) {
- // Wait for all of the data replication components to halt. We
- // ignore any errors because resharding is known to have failed
- // already.
- return Status::OK();
- });
- } else {
- _renameTemporaryReshardingCollection();
- return ExecutorFuture<void>(**executor, Status::OK());
- }
- })
- .then([this, aborted] {
- // It is safe to drop the oplog collections once either (1) the
- // collection is renamed or (2) the operation is aborting.
- invariant(_recipientCtx.getState() >=
- RecipientStateEnum::kStrictConsistency ||
- aborted);
- _cleanupReshardingCollections(aborted);
- })
- .then([this, aborted] {
- if (_recipientCtx.getState() != RecipientStateEnum::kDone) {
- // If a failover occured before removing the recipient document, the
- // recipient could already be in state done.
- _transitionState(RecipientStateEnum::kDone);
- }
-
- if (!_isAlsoDonor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
-
- _externalState->clearFilteringMetadata(opCtx.get());
-
- RecoverableCriticalSectionService::get(opCtx.get())
- ->releaseRecoverableCriticalSection(
- opCtx.get(),
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kLocalWriteConcern);
-
- _metrics()->leaveCriticalSection(getCurrentTime());
- }
- })
- .then([this, executor] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return _updateCoordinator(opCtx.get(), executor);
- })
- .then([this, aborted] {
- {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- removeRecipientDocFailpoint.pauseWhileSet(opCtx.get());
- }
- _removeRecipientDocument(aborted);
- });
- })
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor, aborted, stepdownToken](const auto& factory) {
+ return ExecutorFuture<void>(**executor)
+ .then([this, executor, aborted, stepdownToken, &factory] {
+ if (aborted) {
+ return future_util::withCancellation(
+ _dataReplicationQuiesced.thenRunOn(**executor), stepdownToken)
+ .thenRunOn(**executor)
+ .onError([](Status status) {
+ // Wait for all of the data replication components to halt. We
+ // ignore any errors because resharding is known to have failed
+ // already.
+ return Status::OK();
+ });
+ } else {
+ _renameTemporaryReshardingCollection(factory);
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+ })
+ .then([this, aborted, &factory] {
+ // It is safe to drop the oplog collections once either (1) the
+ // collection is renamed or (2) the operation is aborting.
+ invariant(_recipientCtx.getState() >= RecipientStateEnum::kStrictConsistency ||
+ aborted);
+ _cleanupReshardingCollections(aborted, factory);
+ })
+ .then([this, aborted, &factory] {
+ if (_recipientCtx.getState() != RecipientStateEnum::kDone) {
+ // If a failover occured before removing the recipient document, the
+ // recipient could already be in state done.
+ _transitionState(RecipientStateEnum::kDone, factory);
+ }
+
+ if (!_isAlsoDonor) {
+ auto opCtx = factory.makeOperationContext(&cc());
+
+ _externalState->clearFilteringMetadata(opCtx.get());
+
+ RecoverableCriticalSectionService::get(opCtx.get())
+ ->releaseRecoverableCriticalSection(
+ opCtx.get(),
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kLocalWriteConcern);
+
+ _metrics()->leaveCriticalSection(getCurrentTime());
+ }
+ })
+ .then([this, executor, &factory] {
+ auto opCtx = factory.makeOperationContext(&cc());
+ return _updateCoordinator(opCtx.get(), executor, factory);
+ })
+ .then([this, aborted, &factory] {
+ {
+ auto opCtx = factory.makeOperationContext(&cc());
+ removeRecipientDocFailpoint.pauseWhileSet(opCtx.get());
+ }
+ _removeRecipientDocument(aborted, factory);
+ });
+ })
.onTransientError([](const Status& status) {
LOGV2(5551103,
"Transient error while finishing resharding operation",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& status) { return status.isOK(); })
@@ -367,10 +373,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
const CancellationToken& stepdownToken) noexcept {
auto abortToken = _initAbortSource(stepdownToken);
_markKilledExecutor->startup();
- _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
+ _retryingCancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
return ExecutorFuture<void>(**executor)
- .then([this] { _startMetrics(); })
+ .then([this, executor, abortToken] { return _startMetrics(executor, abortToken); })
.then([this, executor, abortToken] {
return _runUntilStrictConsistencyOrErrored(executor, abortToken);
})
@@ -378,7 +384,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
return _notifyCoordinatorAndAwaitDecision(executor, abortToken);
})
.onCompletion([this, executor, stepdownToken, abortToken](Status status) {
- _cancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor);
+ _retryingCancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor);
if (stepdownToken.isCanceled()) {
// Propagate any errors from the recipient stepping down.
return ExecutorFuture<bool>(**executor, status);
@@ -467,7 +473,8 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
invariant(_cloneTimestamp);
return ExecutorFuture(**executor);
@@ -475,23 +482,24 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return future_util::withCancellation(_allDonorsPreparedToDonate.getFuture(), abortToken)
.thenRunOn(**executor)
- .then([this, executor](
+ .then([this, executor, &factory](
ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) {
- _transitionToCreatingCollection(cloneDetails,
- (*executor)->now() + _minimumOperationDuration);
+ _transitionToCreatingCollection(
+ cloneDetails, (*executor)->now() + _minimumOperationDuration, factory);
_metrics()->setDocumentsToCopy(cloneDetails.approxDocumentsToCopy,
cloneDetails.approxBytesToCopy);
});
}
void ReshardingRecipientService::RecipientStateMachine::
- _createTemporaryReshardingCollectionThenTransitionToCloning() {
+ _createTemporaryReshardingCollectionThenTransitionToCloning(
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kCreatingCollection) {
return;
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
_externalState->ensureTempReshardingCollectionExistsWithIndexes(
opCtx.get(), _metadata, *_cloneTimestamp);
@@ -511,7 +519,7 @@ void ReshardingRecipientService::RecipientStateMachine::
});
}
- _transitionToCloning();
+ _transitionToCloning(factory);
}
std::unique_ptr<ReshardingDataReplicationInterface>
@@ -536,7 +544,8 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationStarted(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
const bool cloningDone = _recipientCtx.getState() > RecipientStateEnum::kCloning;
if (!_dataReplication) {
@@ -548,7 +557,7 @@ void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationSt
->runUntilStrictlyConsistent(**executor,
_recipientService->getInstanceCleanupExecutor(),
abortToken,
- *_cancelableOpCtxFactory,
+ factory,
txnCloneTime.get())
.share();
@@ -564,19 +573,20 @@ void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationSt
ExecutorFuture<void>
ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kCloning) {
return ExecutorFuture(**executor);
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
reshardingPauseRecipientBeforeCloning.pauseWhileSet(opCtx.get());
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- _ensureDataReplicationStarted(opCtx.get(), executor, abortToken);
+ auto opCtx = factory.makeOperationContext(&cc());
+ _ensureDataReplicationStarted(opCtx.get(), executor, abortToken, factory);
}
reshardingRecipientFailsAfterTransitionToCloning.execute([&](const BSONObj& data) {
@@ -585,30 +595,31 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
});
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
reshardingPauseRecipientDuringCloning.pauseWhileSet(opCtx.get());
}
return future_util::withCancellation(_dataReplication->awaitCloningDone(), abortToken)
.thenRunOn(**executor)
- .then([this] { _transitionToApplying(); });
+ .then([this, &factory] { _transitionToApplying(factory); });
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kApplying) {
return ExecutorFuture<void>(**executor, Status::OK());
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- _ensureDataReplicationStarted(opCtx.get(), executor, abortToken);
+ auto opCtx = factory.makeOperationContext(&cc());
+ _ensureDataReplicationStarted(opCtx.get(), executor, abortToken, factory);
}
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return _updateCoordinator(opCtx.get(), executor)
+ auto opCtx = factory.makeOperationContext(&cc());
+ return _updateCoordinator(opCtx.get(), executor, factory)
.then([this, abortToken] {
{
auto opCtx = cc().makeOperationContext();
@@ -618,8 +629,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return future_util::withCancellation(_dataReplication->awaitStrictlyConsistent(),
abortToken);
})
- .then([this] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ .then([this, &factory] {
+ auto opCtx = factory.makeOperationContext(&cc());
for (const auto& donor : _donorShards) {
auto stashNss =
getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId());
@@ -629,9 +640,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
!stashColl || stashColl->isEmpty(opCtx.get()));
}
})
- .then([this] {
+ .then([this, &factory] {
if (!_isAlsoDonor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
RecoverableCriticalSectionService::get(opCtx.get())
->acquireRecoverableCriticalSectionBlockWrites(
opCtx.get(),
@@ -642,13 +653,14 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_metrics()->enterCriticalSection(getCurrentTime());
}
- _transitionToStrictConsistency();
- _writeStrictConsistencyOplog();
+ _transitionToStrictConsistency(factory);
+ _writeStrictConsistencyOplog(factory);
});
}
-void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyOplog() {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyOplog(
+ const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
auto rawOpCtx = opCtx.get();
auto generateOplogEntry = [&]() {
@@ -683,13 +695,14 @@ void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyO
});
}
-void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection() {
+void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection(
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() == RecipientStateEnum::kDone) {
return;
}
if (!_isAlsoDonor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
RecoverableCriticalSectionService::get(opCtx.get())
->promoteRecoverableCriticalSectionToBlockAlsoReads(
@@ -703,8 +716,8 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi
}
void ReshardingRecipientService::RecipientStateMachine::_cleanupReshardingCollections(
- bool aborted) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ bool aborted, const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
resharding::data_copy::ensureOplogCollectionsDropped(
opCtx.get(), _metadata.getReshardingUUID(), _metadata.getSourceUUID(), _donorShards);
@@ -715,19 +728,20 @@ void ReshardingRecipientService::RecipientStateMachine::_cleanupReshardingCollec
}
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
- RecipientStateEnum newState) {
+ RecipientStateEnum newState, const CancelableOperationContextFactory& factory) {
invariant(newState != RecipientStateEnum::kCreatingCollection &&
newState != RecipientStateEnum::kError);
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(newState);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
RecipientShardContext&& newRecipientCtx,
boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime) {
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory) {
invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp);
// For logging purposes.
@@ -735,7 +749,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
auto newState = newRecipientCtx.getState();
_updateRecipientDocument(
- std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime));
+ std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory);
_metrics()->setRecipientState(newState);
@@ -750,42 +764,49 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection(
ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails,
- const boost::optional<mongo::Date_t> startConfigTxnCloneTime) {
+ const boost::optional<mongo::Date_t> startConfigTxnCloneTime,
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection);
- _transitionState(
- std::move(newRecipientCtx), std::move(cloneDetails), std::move(startConfigTxnCloneTime));
+ _transitionState(std::move(newRecipientCtx),
+ std::move(cloneDetails),
+ std::move(startConfigTxnCloneTime),
+ factory);
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning() {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning(
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCloning);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
_metrics()->startCopyingDocuments(getCurrentTime());
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying() {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kApplying);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
auto currentTime = getCurrentTime();
_metrics()->endCopyingDocuments(currentTime);
_metrics()->startApplyingOplogEntries(currentTime);
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency() {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency(
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
auto currentTime = getCurrentTime();
_metrics()->endApplyingOplogEntries(currentTime);
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
+ Status abortReason, const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kError);
emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
}
/**
@@ -856,14 +877,16 @@ BSONObj ReshardingRecipientService::RecipientStateMachine::_makeQueryForCoordina
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateCoordinator(
- OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelableOperationContextFactory& factory) {
repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
return WaitForMajorityService::get(opCtx->getServiceContext())
.waitUntilMajority(clientOpTime, CancellationToken::uncancelable())
.thenRunOn(**executor)
- .then([this] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ .then([this, &factory] {
+ auto opCtx = factory.makeOperationContext(&cc());
auto shardId = _externalState->myShardId(opCtx->getServiceContext());
BSONObjBuilder updateBuilder;
@@ -904,8 +927,9 @@ void ReshardingRecipientService::RecipientStateMachine::commit() {
void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument(
RecipientShardContext&& newRecipientCtx,
boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
@@ -957,8 +981,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
}
}
-void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument(bool aborted) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument(
+ bool aborted, const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
const auto& nss = NamespaceString::kRecipientReshardingOperationsNamespace;
writeConflictRetry(
@@ -1004,31 +1029,53 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics()
return ReshardingMetrics::get(cc().getServiceContext());
}
-void ReshardingRecipientService::RecipientStateMachine::_startMetrics() {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
_metrics()->onStepUp(ReshardingMetrics::Role::kRecipient);
- _restoreMetrics();
- } else {
- _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
+ return _restoreMetricsWithRetry(executor, abortToken);
}
+ _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
+ return ExecutorFuture<void>(**executor);
}
-void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
_metrics()->setRecipientState(_recipientCtx.getState());
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry(
+ [this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); })
+ .onTransientError([](const Status& status) {
+ LOGV2(
+ 5992700, "Transient error while restoring metrics", "error"_attr = redact(status));
+ })
+ .onUnrecoverableError([](const Status& status) {})
+ .until<Status>([](const Status& status) { return status.isOK(); })
+ .on(**executor, abortToken);
+}
+
+void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
+ const CancelableOperationContextFactory& factory) {
+ int64_t documentCountCopied = 0;
+ int64_t documentBytesCopied = 0;
+ int64_t oplogEntriesFetched = 0;
+ int64_t oplogEntriesApplied = 0;
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
{
AutoGetCollection tempReshardingColl(
opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS);
if (tempReshardingColl) {
- int64_t bytesCopied = tempReshardingColl->dataSize(opCtx.get());
- int64_t documentsCopied = tempReshardingColl->numRecords(opCtx.get());
- if (bytesCopied > 0) {
- _metrics()->onDocumentsCopiedForCurrentOp(documentsCopied, bytesCopied);
- }
+ documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
+ documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
}
}
+ reshardingOpCtxKilledWhileRestoringMetrics.execute(
+ [&opCtx](const BSONObj& data) { opCtx->markKilled(); });
+
for (const auto& donor : _donorShards) {
{
AutoGetCollection oplogBufferColl(
@@ -1036,9 +1083,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()),
MODE_IS);
if (oplogBufferColl) {
- int64_t recordsFetched = oplogBufferColl->numRecords(opCtx.get());
- if (recordsFetched > 0)
- _metrics()->onOplogEntriesFetchedForCurrentOp(recordsFetched);
+ oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get());
}
}
@@ -1056,13 +1101,16 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
result);
if (!result.isEmpty()) {
- _metrics()->onOplogEntriesAppliedForCurrentOp(
+ oplogEntriesApplied +=
result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName)
- .Long());
+ .Long();
}
}
}
}
+
+ _metrics()->restoreForCurrentOp(
+ documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied);
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e88d50a2330..3e9a56b4646 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -32,6 +32,7 @@
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/resharding/recipient_document_gen.h"
#include "mongo/db/s/resharding/resharding_data_replication.h"
+#include "mongo/db/s/resharding/resharding_future_util.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/s/resharding/type_collection_fields_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -176,56 +177,67 @@ private:
// The following functions correspond to the actions to take at a particular recipient state.
ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
- void _createTemporaryReshardingCollectionThenTransitionToCloning();
+ void _createTemporaryReshardingCollectionThenTransitionToCloning(
+ const CancelableOperationContextFactory& factory);
ExecutorFuture<void> _cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
- void _writeStrictConsistencyOplog();
+ void _writeStrictConsistencyOplog(const CancelableOperationContextFactory& factory);
- void _renameTemporaryReshardingCollection();
+ void _renameTemporaryReshardingCollection(const CancelableOperationContextFactory& factory);
- void _cleanupReshardingCollections(bool aborted);
+ void _cleanupReshardingCollections(bool aborted,
+ const CancelableOperationContextFactory& factory);
// Transitions the on-disk and in-memory state to 'newState'.
- void _transitionState(RecipientStateEnum newState);
+ void _transitionState(RecipientStateEnum newState,
+ const CancelableOperationContextFactory& factory);
void _transitionState(RecipientShardContext&& newRecipientCtx,
boost::optional<CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime);
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory);
// The following functions transition the on-disk and in-memory state to the named state.
void _transitionToCreatingCollection(CloneDetails cloneDetails,
- boost::optional<mongo::Date_t> startConfigTxnCloneTime);
+ boost::optional<mongo::Date_t> startConfigTxnCloneTime,
+ const CancelableOperationContextFactory& factory);
- void _transitionToCloning();
+ void _transitionToCloning(const CancelableOperationContextFactory& factory);
- void _transitionToApplying();
+ void _transitionToApplying(const CancelableOperationContextFactory& factory);
- void _transitionToStrictConsistency();
+ void _transitionToStrictConsistency(const CancelableOperationContextFactory& factory);
- void _transitionToError(Status abortReason);
+ void _transitionToError(Status abortReason, const CancelableOperationContextFactory& factory);
BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState);
ExecutorFuture<void> _updateCoordinator(
- OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelableOperationContextFactory& factory);
// Updates the mutable portion of the on-disk and in-memory recipient document with
// 'newRecipientCtx', 'fetchTimestamp and 'donorShards'.
void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx,
boost::optional<CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime);
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory);
// Removes the local recipient document from disk.
- void _removeRecipientDocument(bool aborted);
+ void _removeRecipientDocument(bool aborted, const CancelableOperationContextFactory& factory);
std::unique_ptr<ReshardingDataReplicationInterface> _makeDataReplication(
OperationContext* opCtx, bool cloningDone);
@@ -233,14 +245,20 @@ private:
void _ensureDataReplicationStarted(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
ReshardingMetrics* _metrics() const;
- void _startMetrics();
+ ExecutorFuture<void> _startMetrics(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
// Restore metrics using the persisted metrics after stepping up.
- void _restoreMetrics();
+ ExecutorFuture<void> _restoreMetricsWithRetry(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
+ void _restoreMetrics(const CancelableOperationContextFactory& factory);
// Initializes the _abortSource and generates a token from it to return back the caller.
//
@@ -271,7 +289,8 @@ private:
// CancelableOperationContext must have a thread that is always available to it to mark its
// opCtx as killed when the cancelToken has been cancelled.
const std::shared_ptr<ThreadPool> _markKilledExecutor;
- boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory;
+ boost::optional<resharding::RetryingCancelableOperationContextFactory>
+ _retryingCancelableOpCtxFactory;
const ReshardingDataReplicationFactory _dataReplicationFactory;
SharedSemiFuture<void> _dataReplicationQuiesced;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 5d69b790321..b93d43107f7 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -432,6 +432,49 @@ TEST_F(ReshardingRecipientServiceTest, StepDownStepUpEachTransition) {
}
}
+TEST_F(ReshardingRecipientServiceTest, OpCtxKilledWhileRestoringMetrics) {
+ for (bool isAlsoDonor : {false, true}) {
+ LOGV2(5992701,
+ "Running case",
+ "test"_attr = _agent.getTestName(),
+ "isAlsoDonor"_attr = isAlsoDonor);
+
+ // Initialize recipient.
+ auto doc = makeStateDocument(isAlsoDonor);
+ auto instanceId =
+ BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID());
+ auto opCtx = makeOperationContext();
+ if (isAlsoDonor) {
+ createSourceCollection(opCtx.get(), doc);
+ }
+ RecipientStateMachine::insertStateDocument(opCtx.get(), doc);
+ auto recipient = RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+
+ // In order to restore metrics, metrics need to exist in the first place, so put the
+ // recipient in the cloning state, then step down.
+ PauseDuringStateTransitions stateTransitionsGuard{controller(),
+ RecipientStateEnum::kCloning};
+ notifyToStartCloning(opCtx.get(), *recipient, doc);
+ stateTransitionsGuard.wait(RecipientStateEnum::kCloning);
+ stepDown();
+ stateTransitionsGuard.unset(RecipientStateEnum::kCloning);
+ recipient.reset();
+
+ // Enable failpoint and step up.
+ auto fp = globalFailPointRegistry().find("reshardingOpCtxKilledWhileRestoringMetrics");
+ fp->setMode(FailPoint::nTimes, 1);
+ stepUp(opCtx.get());
+
+ // After the failpoint is disabled, the operation should succeed.
+ auto maybeRecipient = RecipientStateMachine::lookup(opCtx.get(), _service, instanceId);
+ ASSERT_TRUE(bool(maybeRecipient));
+ recipient = *maybeRecipient;
+ notifyReshardingCommitting(opCtx.get(), *recipient, doc);
+ ASSERT_OK(recipient->getCompletionFuture().getNoThrow());
+ checkStateDocumentRemoved(opCtx.get());
+ }
+}
+
DEATH_TEST_REGEX_F(ReshardingRecipientServiceTest, CommitFn, "4457001.*tripwire") {
auto doc = makeStateDocument(false /* isAlsoDonor */);
auto opCtx = makeOperationContext();
@@ -767,7 +810,6 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
default:
break;
}
-
// Step down before the transition to state can complete.
stateTransitionsGuard.wait(state);
if (state == RecipientStateEnum::kStrictConsistency) {
@@ -801,6 +843,12 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
ErrorCodes::InterruptedDueToReplStateChange);
prevState = state;
+ if (state == RecipientStateEnum::kApplying ||
+ state == RecipientStateEnum::kStrictConsistency) {
+ // If metrics are being verified in the next pass, ensure a retry does not alter values.
+ auto fp = globalFailPointRegistry().find("reshardingOpCtxKilledWhileRestoringMetrics");
+ fp->setMode(FailPoint::nTimes, 1);
+ }
recipient.reset();
if (state != RecipientStateEnum::kDone)