summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2021-09-29 20:03:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-18 19:00:21 +0000
commit8797d34dae0f520c2cb19ff5c40ee3835ef0cc4c (patch)
treea6facbe1650a57353b066c53ef349ace0a3e1064
parent6b406da5f066f6c69478f7934cdb83c0fa5ca7cd (diff)
downloadmongo-r5.1.0-rc1.tar.gz
SERVER-59927 Add retry to _restoreMetrics()r5.1.0-rc1
RecipientStateMachine::_restoreMetrics() performs a number of read operations to calculate the number of documents it cloned, oplog entries it fetched, and oplog entries it applied at the beginning of starting to run again. These read operations may be interrupted if the primary steps down shortly after having been stepped up, which eventually leads to an fassert(). Therefore, perform _restoreMetrics() in a resharding::WithAutomaticRetry() block so any transient errors can be automatically retried and synchronized with the stepdown token being canceled. Furthermore, refactor RecipientStateMachine to use new RetryingCancelableOperationContextFactory to ensure that all usages of CancelableOperationContextFactory occur within a resharding::WithAutomaticRetry() block. Additionally, add a test case that will cover the _restoreMetrics() read operations being interrupted. (cherry picked from commit b9e2784da82fef8e45b95b88e4ac1443649a5b0c)
-rw-r--r--src/mongo/db/s/resharding/resharding_future_util.h20
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp37
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp384
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h61
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp50
6 files changed, 348 insertions, 213 deletions
diff --git a/src/mongo/db/s/resharding/resharding_future_util.h b/src/mongo/db/s/resharding/resharding_future_util.h
index 6ca27ed3e78..7ebb8c1b31d 100644
--- a/src/mongo/db/s/resharding/resharding_future_util.h
+++ b/src/mongo/db/s/resharding/resharding_future_util.h
@@ -31,6 +31,7 @@
#include <vector>
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/util/cancellation.h"
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
@@ -167,5 +168,24 @@ private:
unique_function<void(const Status&)> _onUnrecoverableError;
};
+/**
+ * Wrapper class around CancelableOperationContextFactory which uses resharding::WithAutomaticRetry
+ * to ensure all cancelable operations will be retried if able upon failure.
+ */
+class RetryingCancelableOperationContextFactory {
+public:
+ RetryingCancelableOperationContextFactory(CancellationToken cancelToken, ExecutorPtr executor)
+ : _factory{std::move(cancelToken), std::move(executor)} {}
+
+ template <typename BodyCallable>
+ decltype(auto) withAutomaticRetry(BodyCallable&& body) const {
+ return resharding::WithAutomaticRetry([this, body]() { return body(_factory); });
+ }
+
+
+private:
+ const CancelableOperationContextFactory _factory;
+};
+
} // namespace resharding
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 8ba5e591e73..be231bce447 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -44,6 +44,7 @@ namespace mongo {
namespace {
constexpr auto kAnotherOperationInProgress = "Another operation is in progress";
constexpr auto kNoOperationInProgress = "No operation is in progress";
+constexpr auto kMetricsSetBeforeRestore = "Expected metrics to be 0 prior to restore";
constexpr auto kTotalOps = "countReshardingOperations";
constexpr auto kSuccessfulOps = "countReshardingSuccessful";
@@ -564,16 +565,10 @@ void ReshardingMetrics::onDocumentsCopied(int64_t documents, int64_t bytes) noex
invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kError}));
- onDocumentsCopiedForCurrentOp(documents, bytes);
- _cumulativeOp->documentsCopied += documents;
- _cumulativeOp->bytesCopied += bytes;
-}
-
-void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
-
_currentOp->documentsCopied += documents;
_currentOp->bytesCopied += bytes;
+ _cumulativeOp->documentsCopied += documents;
+ _cumulativeOp->bytesCopied += bytes;
}
void ReshardingMetrics::gotInserts(int n) noexcept {
@@ -654,14 +649,8 @@ void ReshardingMetrics::onOplogEntriesFetched(int64_t entries) noexcept {
*_currentOp->recipientState,
{RecipientStateEnum::kCloning, RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
- onOplogEntriesFetchedForCurrentOp(entries);
- _cumulativeOp->oplogEntriesFetched += entries;
-}
-
-void ReshardingMetrics::onOplogEntriesFetchedForCurrentOp(int64_t entries) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
-
_currentOp->oplogEntriesFetched += entries;
+ _cumulativeOp->oplogEntriesFetched += entries;
}
void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
@@ -672,14 +661,24 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
invariant(checkState(*_currentOp->recipientState,
{RecipientStateEnum::kApplying, RecipientStateEnum::kError}));
- onOplogEntriesAppliedForCurrentOp(entries);
+ _currentOp->oplogEntriesApplied += entries;
_cumulativeOp->oplogEntriesApplied += entries;
}
-void ReshardingMetrics::onOplogEntriesAppliedForCurrentOp(int64_t entries) noexcept {
+void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied,
+ int64_t documentBytesCopied,
+ int64_t oplogEntriesFetched,
+ int64_t oplogEntriesApplied) noexcept {
invariant(_currentOp, kNoOperationInProgress);
-
- _currentOp->oplogEntriesApplied += entries;
+ invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
+
+ _currentOp->documentsCopied = documentCountCopied;
+ _currentOp->bytesCopied = documentBytesCopied;
+ _currentOp->oplogEntriesFetched = oplogEntriesFetched;
+ _currentOp->oplogEntriesApplied = oplogEntriesApplied;
}
void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index 24ab1a9a38a..480dd25ed8a 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -84,8 +84,6 @@ public:
void setDocumentsToCopyForCurrentOp(int64_t documents, int64_t bytes) noexcept;
// Allows updating metrics on "documents to copy" so long as the recipient is in cloning state.
void onDocumentsCopied(int64_t documents, int64_t bytes) noexcept;
- // Allows updating metrics on "documents to copy".
- void onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept;
// Allows updating metrics on "opcounters";
void gotInserts(int n) noexcept;
@@ -116,9 +114,12 @@ public:
// Allows updating "oplog entries to apply" metrics when the recipient is in applying state.
void onOplogEntriesFetched(int64_t entries) noexcept;
// Allows restoring "oplog entries to apply" metrics.
- void onOplogEntriesFetchedForCurrentOp(int64_t entries) noexcept;
void onOplogEntriesApplied(int64_t entries) noexcept;
- void onOplogEntriesAppliedForCurrentOp(int64_t entries) noexcept;
+
+ void restoreForCurrentOp(int64_t documentCountCopied,
+ int64_t documentBytesCopied,
+ int64_t oplogEntriesFetched,
+ int64_t oplogEntriesApplied) noexcept;
// Allows tracking writes during a critical section when the donor's state is either of
// "donating-oplog-entries" or "blocking-writes".
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 938fbefb2aa..380fb5c5616 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -74,6 +74,7 @@ MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint);
MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientBeforeCloning);
MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringCloning);
MONGO_FAIL_POINT_DEFINE(reshardingPauseRecipientDuringOplogApplication);
+MONGO_FAIL_POINT_DEFINE(reshardingOpCtxKilledWhileRestoringMetrics);
MONGO_FAIL_POINT_DEFINE(reshardingRecipientFailsAfterTransitionToCloning);
namespace {
@@ -168,25 +169,28 @@ ExecutorFuture<void>
ReshardingRecipientService::RecipientStateMachine::_runUntilStrictConsistencyOrErrored(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) noexcept {
- return resharding::WithAutomaticRetry([this, executor, abortToken] {
- return ExecutorFuture(**executor)
- .then([this, executor, abortToken] {
- return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
- executor, abortToken);
- })
- .then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); })
- .then([this, executor, abortToken] {
- return _cloneThenTransitionToApplying(executor, abortToken);
- })
- .then([this, executor, abortToken] {
- return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
- executor, abortToken);
- });
- })
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor, abortToken](const auto& factory) {
+ return ExecutorFuture(**executor)
+ .then([this, executor, abortToken, &factory] {
+ return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
+ executor, abortToken, factory);
+ })
+ .then([this, &factory] {
+ _createTemporaryReshardingCollectionThenTransitionToCloning(factory);
+ })
+ .then([this, executor, abortToken, &factory] {
+ return _cloneThenTransitionToApplying(executor, abortToken, factory);
+ })
+ .then([this, executor, abortToken, &factory] {
+ return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
+ executor, abortToken, factory);
+ });
+ })
.onTransientError([](const Status& status) {
LOGV2(5551100,
"Recipient _runUntilStrictConsistencyOrErrored encountered transient error",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([abortToken](const Status& status) { return status.isOK(); })
@@ -200,23 +204,24 @@ ReshardingRecipientService::RecipientStateMachine::_runUntilStrictConsistencyOrE
"Resharding operation recipient state machine failed",
"namespace"_attr = _metadata.getSourceNss(),
"reshardingUUID"_attr = _metadata.getReshardingUUID(),
- "error"_attr = status);
-
- return resharding::WithAutomaticRetry([this, status] {
- // It is illegal to transition into kError if the state has already surpassed
- // kStrictConsistency.
- invariant(_recipientCtx.getState() < RecipientStateEnum::kStrictConsistency);
- _transitionToError(status);
-
- // Intentionally swallow the error - by transitioning to kError, the
- // recipient effectively recovers from encountering the error and
- // should continue running in the future chain.
- })
+ "error"_attr = redact(status));
+
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, status](const auto& factory) {
+ // It is illegal to transition into kError if the state has already surpassed
+ // kStrictConsistency.
+ invariant(_recipientCtx.getState() < RecipientStateEnum::kStrictConsistency);
+ _transitionToError(status, factory);
+
+ // Intentionally swallow the error - by transitioning to kError, the
+ // recipient effectively recovers from encountering the error and
+ // should continue running in the future chain.
+ })
.onTransientError([](const Status& status) {
LOGV2(5551104,
"Recipient _runUntilStrictConsistencyOrErrored encountered transient "
"error while transitioning to state kError",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& retryStatus) { return retryStatus.isOK(); })
@@ -248,15 +253,16 @@ ReshardingRecipientService::RecipientStateMachine::_notifyCoordinatorAndAwaitDec
return ExecutorFuture(**executor);
}
- return resharding::WithAutomaticRetry([this, executor] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return _updateCoordinator(opCtx.get(), executor);
- })
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor](const auto& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
+ return _updateCoordinator(opCtx.get(), executor, factory);
+ })
.onTransientError([](const Status& status) {
LOGV2(5551102,
"Transient error while notifying coordinator of recipient state for the "
"coordinator's decision",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& status) { return status.isOK(); })
@@ -271,70 +277,70 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_finishR
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& stepdownToken,
bool aborted) noexcept {
- return resharding::WithAutomaticRetry([this, executor, aborted, stepdownToken] {
- return ExecutorFuture<void>(**executor)
- .then([this, executor, aborted, stepdownToken] {
- if (aborted) {
- return future_util::withCancellation(
- _dataReplicationQuiesced.thenRunOn(**executor), stepdownToken)
- .thenRunOn(**executor)
- .onError([](Status status) {
- // Wait for all of the data replication components to halt. We
- // ignore any errors because resharding is known to have failed
- // already.
- return Status::OK();
- });
- } else {
- _renameTemporaryReshardingCollection();
- return ExecutorFuture<void>(**executor, Status::OK());
- }
- })
- .then([this, aborted] {
- // It is safe to drop the oplog collections once either (1) the
- // collection is renamed or (2) the operation is aborting.
- invariant(_recipientCtx.getState() >=
- RecipientStateEnum::kStrictConsistency ||
- aborted);
- _cleanupReshardingCollections(aborted);
- })
- .then([this, aborted] {
- if (_recipientCtx.getState() != RecipientStateEnum::kDone) {
- // If a failover occured before removing the recipient document, the
- // recipient could already be in state done.
- _transitionState(RecipientStateEnum::kDone);
- }
-
- if (!_isAlsoDonor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
-
- _externalState->clearFilteringMetadata(opCtx.get());
-
- RecoverableCriticalSectionService::get(opCtx.get())
- ->releaseRecoverableCriticalSection(
- opCtx.get(),
- _metadata.getSourceNss(),
- _critSecReason,
- ShardingCatalogClient::kLocalWriteConcern);
-
- _metrics()->leaveCriticalSection(getCurrentTime());
- }
- })
- .then([this, executor] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return _updateCoordinator(opCtx.get(), executor);
- })
- .then([this, aborted] {
- {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- removeRecipientDocFailpoint.pauseWhileSet(opCtx.get());
- }
- _removeRecipientDocument(aborted);
- });
- })
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor, aborted, stepdownToken](const auto& factory) {
+ return ExecutorFuture<void>(**executor)
+ .then([this, executor, aborted, stepdownToken, &factory] {
+ if (aborted) {
+ return future_util::withCancellation(
+ _dataReplicationQuiesced.thenRunOn(**executor), stepdownToken)
+ .thenRunOn(**executor)
+ .onError([](Status status) {
+ // Wait for all of the data replication components to halt. We
+ // ignore any errors because resharding is known to have failed
+ // already.
+ return Status::OK();
+ });
+ } else {
+ _renameTemporaryReshardingCollection(factory);
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+ })
+ .then([this, aborted, &factory] {
+ // It is safe to drop the oplog collections once either (1) the
+ // collection is renamed or (2) the operation is aborting.
+ invariant(_recipientCtx.getState() >= RecipientStateEnum::kStrictConsistency ||
+ aborted);
+ _cleanupReshardingCollections(aborted, factory);
+ })
+ .then([this, aborted, &factory] {
+ if (_recipientCtx.getState() != RecipientStateEnum::kDone) {
+ // If a failover occured before removing the recipient document, the
+ // recipient could already be in state done.
+ _transitionState(RecipientStateEnum::kDone, factory);
+ }
+
+ if (!_isAlsoDonor) {
+ auto opCtx = factory.makeOperationContext(&cc());
+
+ _externalState->clearFilteringMetadata(opCtx.get());
+
+ RecoverableCriticalSectionService::get(opCtx.get())
+ ->releaseRecoverableCriticalSection(
+ opCtx.get(),
+ _metadata.getSourceNss(),
+ _critSecReason,
+ ShardingCatalogClient::kLocalWriteConcern);
+
+ _metrics()->leaveCriticalSection(getCurrentTime());
+ }
+ })
+ .then([this, executor, &factory] {
+ auto opCtx = factory.makeOperationContext(&cc());
+ return _updateCoordinator(opCtx.get(), executor, factory);
+ })
+ .then([this, aborted, &factory] {
+ {
+ auto opCtx = factory.makeOperationContext(&cc());
+ removeRecipientDocFailpoint.pauseWhileSet(opCtx.get());
+ }
+ _removeRecipientDocument(aborted, factory);
+ });
+ })
.onTransientError([](const Status& status) {
LOGV2(5551103,
"Transient error while finishing resharding operation",
- "error"_attr = status);
+ "error"_attr = redact(status));
})
.onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& status) { return status.isOK(); })
@@ -367,10 +373,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
const CancellationToken& stepdownToken) noexcept {
auto abortToken = _initAbortSource(stepdownToken);
_markKilledExecutor->startup();
- _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
+ _retryingCancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
return ExecutorFuture<void>(**executor)
- .then([this] { _startMetrics(); })
+ .then([this, executor, abortToken] { return _startMetrics(executor, abortToken); })
.then([this, executor, abortToken] {
return _runUntilStrictConsistencyOrErrored(executor, abortToken);
})
@@ -378,7 +384,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
return _notifyCoordinatorAndAwaitDecision(executor, abortToken);
})
.onCompletion([this, executor, stepdownToken, abortToken](Status status) {
- _cancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor);
+ _retryingCancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor);
if (stepdownToken.isCanceled()) {
// Propagate any errors from the recipient stepping down.
return ExecutorFuture<bool>(**executor, status);
@@ -467,7 +473,8 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
invariant(_cloneTimestamp);
return ExecutorFuture(**executor);
@@ -475,23 +482,24 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return future_util::withCancellation(_allDonorsPreparedToDonate.getFuture(), abortToken)
.thenRunOn(**executor)
- .then([this, executor](
+ .then([this, executor, &factory](
ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) {
- _transitionToCreatingCollection(cloneDetails,
- (*executor)->now() + _minimumOperationDuration);
+ _transitionToCreatingCollection(
+ cloneDetails, (*executor)->now() + _minimumOperationDuration, factory);
_metrics()->setDocumentsToCopy(cloneDetails.approxDocumentsToCopy,
cloneDetails.approxBytesToCopy);
});
}
void ReshardingRecipientService::RecipientStateMachine::
- _createTemporaryReshardingCollectionThenTransitionToCloning() {
+ _createTemporaryReshardingCollectionThenTransitionToCloning(
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kCreatingCollection) {
return;
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
_externalState->ensureTempReshardingCollectionExistsWithIndexes(
opCtx.get(), _metadata, *_cloneTimestamp);
@@ -511,7 +519,7 @@ void ReshardingRecipientService::RecipientStateMachine::
});
}
- _transitionToCloning();
+ _transitionToCloning(factory);
}
std::unique_ptr<ReshardingDataReplicationInterface>
@@ -536,7 +544,8 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationStarted(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
const bool cloningDone = _recipientCtx.getState() > RecipientStateEnum::kCloning;
if (!_dataReplication) {
@@ -548,7 +557,7 @@ void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationSt
->runUntilStrictlyConsistent(**executor,
_recipientService->getInstanceCleanupExecutor(),
abortToken,
- *_cancelableOpCtxFactory,
+ factory,
txnCloneTime.get())
.share();
@@ -564,19 +573,20 @@ void ReshardingRecipientService::RecipientStateMachine::_ensureDataReplicationSt
ExecutorFuture<void>
ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kCloning) {
return ExecutorFuture(**executor);
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
reshardingPauseRecipientBeforeCloning.pauseWhileSet(opCtx.get());
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- _ensureDataReplicationStarted(opCtx.get(), executor, abortToken);
+ auto opCtx = factory.makeOperationContext(&cc());
+ _ensureDataReplicationStarted(opCtx.get(), executor, abortToken, factory);
}
reshardingRecipientFailsAfterTransitionToCloning.execute([&](const BSONObj& data) {
@@ -585,30 +595,31 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
});
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
reshardingPauseRecipientDuringCloning.pauseWhileSet(opCtx.get());
}
return future_util::withCancellation(_dataReplication->awaitCloningDone(), abortToken)
.thenRunOn(**executor)
- .then([this] { _transitionToApplying(); });
+ .then([this, &factory] { _transitionToApplying(factory); });
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() > RecipientStateEnum::kApplying) {
return ExecutorFuture<void>(**executor, Status::OK());
}
{
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- _ensureDataReplicationStarted(opCtx.get(), executor, abortToken);
+ auto opCtx = factory.makeOperationContext(&cc());
+ _ensureDataReplicationStarted(opCtx.get(), executor, abortToken, factory);
}
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return _updateCoordinator(opCtx.get(), executor)
+ auto opCtx = factory.makeOperationContext(&cc());
+ return _updateCoordinator(opCtx.get(), executor, factory)
.then([this, abortToken] {
{
auto opCtx = cc().makeOperationContext();
@@ -618,8 +629,8 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return future_util::withCancellation(_dataReplication->awaitStrictlyConsistent(),
abortToken);
})
- .then([this] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ .then([this, &factory] {
+ auto opCtx = factory.makeOperationContext(&cc());
for (const auto& donor : _donorShards) {
auto stashNss =
getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId());
@@ -629,9 +640,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
!stashColl || stashColl->isEmpty(opCtx.get()));
}
})
- .then([this] {
+ .then([this, &factory] {
if (!_isAlsoDonor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
RecoverableCriticalSectionService::get(opCtx.get())
->acquireRecoverableCriticalSectionBlockWrites(
opCtx.get(),
@@ -642,13 +653,14 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_metrics()->enterCriticalSection(getCurrentTime());
}
- _transitionToStrictConsistency();
- _writeStrictConsistencyOplog();
+ _transitionToStrictConsistency(factory);
+ _writeStrictConsistencyOplog(factory);
});
}
-void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyOplog() {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyOplog(
+ const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
auto rawOpCtx = opCtx.get();
auto generateOplogEntry = [&]() {
@@ -683,13 +695,14 @@ void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyO
});
}
-void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection() {
+void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardingCollection(
+ const CancelableOperationContextFactory& factory) {
if (_recipientCtx.getState() == RecipientStateEnum::kDone) {
return;
}
if (!_isAlsoDonor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
RecoverableCriticalSectionService::get(opCtx.get())
->promoteRecoverableCriticalSectionToBlockAlsoReads(
@@ -703,8 +716,8 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi
}
void ReshardingRecipientService::RecipientStateMachine::_cleanupReshardingCollections(
- bool aborted) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ bool aborted, const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
resharding::data_copy::ensureOplogCollectionsDropped(
opCtx.get(), _metadata.getReshardingUUID(), _metadata.getSourceUUID(), _donorShards);
@@ -715,19 +728,20 @@ void ReshardingRecipientService::RecipientStateMachine::_cleanupReshardingCollec
}
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
- RecipientStateEnum newState) {
+ RecipientStateEnum newState, const CancelableOperationContextFactory& factory) {
invariant(newState != RecipientStateEnum::kCreatingCollection &&
newState != RecipientStateEnum::kError);
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(newState);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
RecipientShardContext&& newRecipientCtx,
boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime) {
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory) {
invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp);
// For logging purposes.
@@ -735,7 +749,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
auto newState = newRecipientCtx.getState();
_updateRecipientDocument(
- std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime));
+ std::move(newRecipientCtx), std::move(cloneDetails), std::move(configStartTime), factory);
_metrics()->setRecipientState(newState);
@@ -750,42 +764,49 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection(
ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails,
- const boost::optional<mongo::Date_t> startConfigTxnCloneTime) {
+ const boost::optional<mongo::Date_t> startConfigTxnCloneTime,
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection);
- _transitionState(
- std::move(newRecipientCtx), std::move(cloneDetails), std::move(startConfigTxnCloneTime));
+ _transitionState(std::move(newRecipientCtx),
+ std::move(cloneDetails),
+ std::move(startConfigTxnCloneTime),
+ factory);
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning() {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning(
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCloning);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
_metrics()->startCopyingDocuments(getCurrentTime());
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying() {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kApplying);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
auto currentTime = getCurrentTime();
_metrics()->endCopyingDocuments(currentTime);
_metrics()->startApplyingOplogEntries(currentTime);
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency() {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency(
+ const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
auto currentTime = getCurrentTime();
_metrics()->endApplyingOplogEntries(currentTime);
}
-void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) {
+void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
+ Status abortReason, const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kError);
emplaceTruncatedAbortReasonIfExists(newRecipientCtx, abortReason);
- _transitionState(std::move(newRecipientCtx), boost::none, boost::none);
+ _transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
}
/**
@@ -856,14 +877,16 @@ BSONObj ReshardingRecipientService::RecipientStateMachine::_makeQueryForCoordina
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateCoordinator(
- OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelableOperationContextFactory& factory) {
repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
return WaitForMajorityService::get(opCtx->getServiceContext())
.waitUntilMajority(clientOpTime, CancellationToken::uncancelable())
.thenRunOn(**executor)
- .then([this] {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ .then([this, &factory] {
+ auto opCtx = factory.makeOperationContext(&cc());
auto shardId = _externalState->myShardId(opCtx->getServiceContext());
BSONObjBuilder updateBuilder;
@@ -904,8 +927,9 @@ void ReshardingRecipientService::RecipientStateMachine::commit() {
void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument(
RecipientShardContext&& newRecipientCtx,
boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
@@ -957,8 +981,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
}
}
-void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument(bool aborted) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument(
+ bool aborted, const CancelableOperationContextFactory& factory) {
+ auto opCtx = factory.makeOperationContext(&cc());
const auto& nss = NamespaceString::kRecipientReshardingOperationsNamespace;
writeConflictRetry(
@@ -1004,31 +1029,53 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics()
return ReshardingMetrics::get(cc().getServiceContext());
}
-void ReshardingRecipientService::RecipientStateMachine::_startMetrics() {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
_metrics()->onStepUp(ReshardingMetrics::Role::kRecipient);
- _restoreMetrics();
- } else {
- _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
+ return _restoreMetricsWithRetry(executor, abortToken);
}
+ _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
+ return ExecutorFuture<void>(**executor);
}
-void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
_metrics()->setRecipientState(_recipientCtx.getState());
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry(
+ [this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); })
+ .onTransientError([](const Status& status) {
+ LOGV2(
+ 5992700, "Transient error while restoring metrics", "error"_attr = redact(status));
+ })
+ .onUnrecoverableError([](const Status& status) {})
+ .until<Status>([](const Status& status) { return status.isOK(); })
+ .on(**executor, abortToken);
+}
+
+void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
+ const CancelableOperationContextFactory& factory) {
+ int64_t documentCountCopied = 0;
+ int64_t documentBytesCopied = 0;
+ int64_t oplogEntriesFetched = 0;
+ int64_t oplogEntriesApplied = 0;
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto opCtx = factory.makeOperationContext(&cc());
{
AutoGetCollection tempReshardingColl(
opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS);
if (tempReshardingColl) {
- int64_t bytesCopied = tempReshardingColl->dataSize(opCtx.get());
- int64_t documentsCopied = tempReshardingColl->numRecords(opCtx.get());
- if (bytesCopied > 0) {
- _metrics()->onDocumentsCopiedForCurrentOp(documentsCopied, bytesCopied);
- }
+ documentBytesCopied = tempReshardingColl->dataSize(opCtx.get());
+ documentCountCopied = tempReshardingColl->numRecords(opCtx.get());
}
}
+ reshardingOpCtxKilledWhileRestoringMetrics.execute(
+ [&opCtx](const BSONObj& data) { opCtx->markKilled(); });
+
for (const auto& donor : _donorShards) {
{
AutoGetCollection oplogBufferColl(
@@ -1036,9 +1083,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()),
MODE_IS);
if (oplogBufferColl) {
- int64_t recordsFetched = oplogBufferColl->numRecords(opCtx.get());
- if (recordsFetched > 0)
- _metrics()->onOplogEntriesFetchedForCurrentOp(recordsFetched);
+ oplogEntriesFetched += oplogBufferColl->numRecords(opCtx.get());
}
}
@@ -1056,13 +1101,16 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
result);
if (!result.isEmpty()) {
- _metrics()->onOplogEntriesAppliedForCurrentOp(
+ oplogEntriesApplied +=
result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName)
- .Long());
+ .Long();
}
}
}
}
+
+ _metrics()->restoreForCurrentOp(
+ documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied);
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e88d50a2330..3e9a56b4646 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -32,6 +32,7 @@
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/s/resharding/recipient_document_gen.h"
#include "mongo/db/s/resharding/resharding_data_replication.h"
+#include "mongo/db/s/resharding/resharding_future_util.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/s/resharding/type_collection_fields_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -176,56 +177,67 @@ private:
// The following functions correspond to the actions to take at a particular recipient state.
ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
- void _createTemporaryReshardingCollectionThenTransitionToCloning();
+ void _createTemporaryReshardingCollectionThenTransitionToCloning(
+ const CancelableOperationContextFactory& factory);
ExecutorFuture<void> _cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
- void _writeStrictConsistencyOplog();
+ void _writeStrictConsistencyOplog(const CancelableOperationContextFactory& factory);
- void _renameTemporaryReshardingCollection();
+ void _renameTemporaryReshardingCollection(const CancelableOperationContextFactory& factory);
- void _cleanupReshardingCollections(bool aborted);
+ void _cleanupReshardingCollections(bool aborted,
+ const CancelableOperationContextFactory& factory);
// Transitions the on-disk and in-memory state to 'newState'.
- void _transitionState(RecipientStateEnum newState);
+ void _transitionState(RecipientStateEnum newState,
+ const CancelableOperationContextFactory& factory);
void _transitionState(RecipientShardContext&& newRecipientCtx,
boost::optional<CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime);
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory);
// The following functions transition the on-disk and in-memory state to the named state.
void _transitionToCreatingCollection(CloneDetails cloneDetails,
- boost::optional<mongo::Date_t> startConfigTxnCloneTime);
+ boost::optional<mongo::Date_t> startConfigTxnCloneTime,
+ const CancelableOperationContextFactory& factory);
- void _transitionToCloning();
+ void _transitionToCloning(const CancelableOperationContextFactory& factory);
- void _transitionToApplying();
+ void _transitionToApplying(const CancelableOperationContextFactory& factory);
- void _transitionToStrictConsistency();
+ void _transitionToStrictConsistency(const CancelableOperationContextFactory& factory);
- void _transitionToError(Status abortReason);
+ void _transitionToError(Status abortReason, const CancelableOperationContextFactory& factory);
BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, RecipientStateEnum newState);
ExecutorFuture<void> _updateCoordinator(
- OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ OperationContext* opCtx,
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelableOperationContextFactory& factory);
// Updates the mutable portion of the on-disk and in-memory recipient document with
// 'newRecipientCtx', 'fetchTimestamp and 'donorShards'.
void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx,
boost::optional<CloneDetails>&& cloneDetails,
- boost::optional<mongo::Date_t> configStartTime);
+ boost::optional<mongo::Date_t> configStartTime,
+ const CancelableOperationContextFactory& factory);
// Removes the local recipient document from disk.
- void _removeRecipientDocument(bool aborted);
+ void _removeRecipientDocument(bool aborted, const CancelableOperationContextFactory& factory);
std::unique_ptr<ReshardingDataReplicationInterface> _makeDataReplication(
OperationContext* opCtx, bool cloningDone);
@@ -233,14 +245,20 @@ private:
void _ensureDataReplicationStarted(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken);
+ const CancellationToken& abortToken,
+ const CancelableOperationContextFactory& factory);
ReshardingMetrics* _metrics() const;
- void _startMetrics();
+ ExecutorFuture<void> _startMetrics(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
// Restore metrics using the persisted metrics after stepping up.
- void _restoreMetrics();
+ ExecutorFuture<void> _restoreMetricsWithRetry(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
+ void _restoreMetrics(const CancelableOperationContextFactory& factory);
// Initializes the _abortSource and generates a token from it to return back the caller.
//
@@ -271,7 +289,8 @@ private:
// CancelableOperationContext must have a thread that is always available to it to mark its
// opCtx as killed when the cancelToken has been cancelled.
const std::shared_ptr<ThreadPool> _markKilledExecutor;
- boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory;
+ boost::optional<resharding::RetryingCancelableOperationContextFactory>
+ _retryingCancelableOpCtxFactory;
const ReshardingDataReplicationFactory _dataReplicationFactory;
SharedSemiFuture<void> _dataReplicationQuiesced;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 5d69b790321..b93d43107f7 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -432,6 +432,49 @@ TEST_F(ReshardingRecipientServiceTest, StepDownStepUpEachTransition) {
}
}
+TEST_F(ReshardingRecipientServiceTest, OpCtxKilledWhileRestoringMetrics) {
+ for (bool isAlsoDonor : {false, true}) {
+ LOGV2(5992701,
+ "Running case",
+ "test"_attr = _agent.getTestName(),
+ "isAlsoDonor"_attr = isAlsoDonor);
+
+ // Initialize recipient.
+ auto doc = makeStateDocument(isAlsoDonor);
+ auto instanceId =
+ BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID());
+ auto opCtx = makeOperationContext();
+ if (isAlsoDonor) {
+ createSourceCollection(opCtx.get(), doc);
+ }
+ RecipientStateMachine::insertStateDocument(opCtx.get(), doc);
+ auto recipient = RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+
+ // In order to restore metrics, metrics need to exist in the first place, so put the
+ // recipient in the cloning state, then step down.
+ PauseDuringStateTransitions stateTransitionsGuard{controller(),
+ RecipientStateEnum::kCloning};
+ notifyToStartCloning(opCtx.get(), *recipient, doc);
+ stateTransitionsGuard.wait(RecipientStateEnum::kCloning);
+ stepDown();
+ stateTransitionsGuard.unset(RecipientStateEnum::kCloning);
+ recipient.reset();
+
+ // Enable failpoint and step up.
+ auto fp = globalFailPointRegistry().find("reshardingOpCtxKilledWhileRestoringMetrics");
+ fp->setMode(FailPoint::nTimes, 1);
+ stepUp(opCtx.get());
+
+ // After the failpoint is disabled, the operation should succeed.
+ auto maybeRecipient = RecipientStateMachine::lookup(opCtx.get(), _service, instanceId);
+ ASSERT_TRUE(bool(maybeRecipient));
+ recipient = *maybeRecipient;
+ notifyReshardingCommitting(opCtx.get(), *recipient, doc);
+ ASSERT_OK(recipient->getCompletionFuture().getNoThrow());
+ checkStateDocumentRemoved(opCtx.get());
+ }
+}
+
DEATH_TEST_REGEX_F(ReshardingRecipientServiceTest, CommitFn, "4457001.*tripwire") {
auto doc = makeStateDocument(false /* isAlsoDonor */);
auto opCtx = makeOperationContext();
@@ -767,7 +810,6 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
default:
break;
}
-
// Step down before the transition to state can complete.
stateTransitionsGuard.wait(state);
if (state == RecipientStateEnum::kStrictConsistency) {
@@ -801,6 +843,12 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
ErrorCodes::InterruptedDueToReplStateChange);
prevState = state;
+ if (state == RecipientStateEnum::kApplying ||
+ state == RecipientStateEnum::kStrictConsistency) {
+ // If metrics are being verified in the next pass, ensure a retry does not alter values.
+ auto fp = globalFailPointRegistry().find("reshardingOpCtxKilledWhileRestoringMetrics");
+ fp->setMode(FailPoint::nTimes, 1);
+ }
recipient.reset();
if (state != RecipientStateEnum::kDone)