diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-04-20 15:36:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-20 16:05:04 +0000 |
commit | 018063d3f7e77781aebd95de9f992aa21d5cb299 (patch) | |
tree | 5b8c918445937a003b3725fc21f17f12f083e28b /src/mongo/db/s/resharding/resharding_txn_cloner.cpp | |
parent | a21ee446737e41b4ef9570699b2005d4634374b7 (diff) | |
download | mongo-018063d3f7e77781aebd95de9f992aa21d5cb299.tar.gz |
SERVER-56158 Add resharding::WithAutomaticRetry() util around AsyncTry.
Replaces direct usages of AsyncTry in ReshardingCollectionCloner,
ReshardingTxnCloner, and ReshardingOplogBatchApplier.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_txn_cloner.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_txn_cloner.cpp | 72 |
1 files changed, 26 insertions, 46 deletions
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp index 496efbfa86f..1c52e35c473 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp @@ -54,6 +54,7 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" +#include "mongo/db/s/resharding/resharding_future_util.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_txn_cloner_progress_gen.h" #include "mongo/db/s/session_catalog_migration_destination.h" @@ -61,8 +62,6 @@ #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_participant.h" #include "mongo/logv2/log.h" -#include "mongo/logv2/redaction.h" -#include "mongo/util/future_util.h" namespace mongo { @@ -164,7 +163,7 @@ SemiFuture<void> ReshardingTxnCloner::run( auto chainCtx = std::make_shared<ChainContext>(); - return AsyncTry([this, chainCtx, factory, mongoProcessInterface_forTest] { + return resharding::WithAutomaticRetry([this, chainCtx, factory, mongoProcessInterface_forTest] { if (!chainCtx->pipeline) { auto opCtx = factory.makeOperationContext(&cc()); chainCtx->pipeline = [&]() { @@ -247,55 +246,36 @@ SemiFuture<void> ReshardingTxnCloner::run( chainCtx->donorRecord = boost::none; return makeReadyFutureWith([] {}).share(); }) - .until([this, cancelToken, chainCtx, factory](Status status) { - if (status.isOK() && chainCtx->moreToCome) { - return false; - } - - if (chainCtx->pipeline) { + .onTransientError([this](const Status& status) { + LOGV2(5461600, + "Transient error while cloning config.transactions collection", + "sourceId"_attr = _sourceId, + "readTimestamp"_attr = _fetchTimestamp, + "error"_attr = redact(status)); + }) + .onUnrecoverableError([this](const Status& status) { + LOGV2_ERROR( + 5461601, + "Operation-fatal error for resharding while cloning config.transactions collection", + "sourceId"_attr = _sourceId, + "readTimestamp"_attr = _fetchTimestamp, + "error"_attr = redact(status)); + }) + .until([chainCtx, factory](const Status& status) { + if (!status.isOK() && chainCtx->pipeline) { auto opCtx = factory.makeOperationContext(&cc()); chainCtx->pipeline->dispose(opCtx.get()); chainCtx->pipeline.reset(); } - if (status.isA<ErrorCategory::CancellationError>() || - status.isA<ErrorCategory::NotPrimaryError>()) { - // Cancellation and NotPrimary errors indicate the primary-only service Instance - // will be shut down or is shutting down now - provided the cancelToken is also - // canceled. Otherwise, the errors may have originated from a remote response rather - // than the shard itself. - // - // Don't retry when primary-only service Instance is shutting down. - return !cancelToken.isCanceled(); - } - - if (status.isA<ErrorCategory::RetriableError>() || - status.isA<ErrorCategory::CursorInvalidatedError>() || - status == ErrorCodes::Interrupted) { - // Do retry on any other types of retryable errors though. Also retry on errors from - // stray killCursors and killOp commands being run. - LOGV2(5461600, - "Transient error while cloning config.transactions collection", - "sourceId"_attr = _sourceId, - "fetchTimestamp"_attr = _fetchTimestamp, - "error"_attr = redact(status)); - return false; - } - - if (!status.isOK()) { - LOGV2(5461601, - "Operation-fatal error for resharding while cloning config.transactions" - " collection", - "sourceId"_attr = _sourceId, - "fetchTimestamp"_attr = _fetchTimestamp, - "error"_attr = redact(status)); - } - - return true; + return status.isOK() && !chainCtx->moreToCome; }) - .on(executor, cancelToken) - .thenRunOn(cleanupExecutor) - .onCompletion([this, chainCtx](Status status) { + .on(std::move(executor), std::move(cancelToken)) + .thenRunOn(std::move(cleanupExecutor)) + // It is unsafe to capture `this` once the task is running on the cleanupExecutor because + // RecipientStateMachine, along with its ReshardingTxnCloner member, may have already been + // destructed. + .onCompletion([chainCtx](Status status) { if (chainCtx->pipeline) { // Guarantee the pipeline is always cleaned up - even upon cancellation. auto client = |