summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-04-20 15:36:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-20 16:05:04 +0000
commit018063d3f7e77781aebd95de9f992aa21d5cb299 (patch)
tree5b8c918445937a003b3725fc21f17f12f083e28b /src/mongo/db/s/resharding/resharding_txn_cloner.cpp
parenta21ee446737e41b4ef9570699b2005d4634374b7 (diff)
downloadmongo-018063d3f7e77781aebd95de9f992aa21d5cb299.tar.gz
SERVER-56158 Add resharding::WithAutomaticRetry() util around AsyncTry.
Replaces direct usages of AsyncTry in ReshardingCollectionCloner, ReshardingTxnCloner, and ReshardingOplogBatchApplier.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_txn_cloner.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner.cpp72
1 files changed, 26 insertions, 46 deletions
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
index 496efbfa86f..1c52e35c473 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
+#include "mongo/db/s/resharding/resharding_future_util.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_txn_cloner_progress_gen.h"
#include "mongo/db/s/session_catalog_migration_destination.h"
@@ -61,8 +62,6 @@
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/logv2/log.h"
-#include "mongo/logv2/redaction.h"
-#include "mongo/util/future_util.h"
namespace mongo {
@@ -164,7 +163,7 @@ SemiFuture<void> ReshardingTxnCloner::run(
auto chainCtx = std::make_shared<ChainContext>();
- return AsyncTry([this, chainCtx, factory, mongoProcessInterface_forTest] {
+ return resharding::WithAutomaticRetry([this, chainCtx, factory, mongoProcessInterface_forTest] {
if (!chainCtx->pipeline) {
auto opCtx = factory.makeOperationContext(&cc());
chainCtx->pipeline = [&]() {
@@ -247,55 +246,36 @@ SemiFuture<void> ReshardingTxnCloner::run(
chainCtx->donorRecord = boost::none;
return makeReadyFutureWith([] {}).share();
})
- .until([this, cancelToken, chainCtx, factory](Status status) {
- if (status.isOK() && chainCtx->moreToCome) {
- return false;
- }
-
- if (chainCtx->pipeline) {
+ .onTransientError([this](const Status& status) {
+ LOGV2(5461600,
+ "Transient error while cloning config.transactions collection",
+ "sourceId"_attr = _sourceId,
+ "readTimestamp"_attr = _fetchTimestamp,
+ "error"_attr = redact(status));
+ })
+ .onUnrecoverableError([this](const Status& status) {
+ LOGV2_ERROR(
+ 5461601,
+ "Operation-fatal error for resharding while cloning config.transactions collection",
+ "sourceId"_attr = _sourceId,
+ "readTimestamp"_attr = _fetchTimestamp,
+ "error"_attr = redact(status));
+ })
+ .until([chainCtx, factory](const Status& status) {
+ if (!status.isOK() && chainCtx->pipeline) {
auto opCtx = factory.makeOperationContext(&cc());
chainCtx->pipeline->dispose(opCtx.get());
chainCtx->pipeline.reset();
}
- if (status.isA<ErrorCategory::CancellationError>() ||
- status.isA<ErrorCategory::NotPrimaryError>()) {
- // Cancellation and NotPrimary errors indicate the primary-only service Instance
- // will be shut down or is shutting down now - provided the cancelToken is also
- // canceled. Otherwise, the errors may have originated from a remote response rather
- // than the shard itself.
- //
- // Don't retry when primary-only service Instance is shutting down.
- return !cancelToken.isCanceled();
- }
-
- if (status.isA<ErrorCategory::RetriableError>() ||
- status.isA<ErrorCategory::CursorInvalidatedError>() ||
- status == ErrorCodes::Interrupted) {
- // Do retry on any other types of retryable errors though. Also retry on errors from
- // stray killCursors and killOp commands being run.
- LOGV2(5461600,
- "Transient error while cloning config.transactions collection",
- "sourceId"_attr = _sourceId,
- "fetchTimestamp"_attr = _fetchTimestamp,
- "error"_attr = redact(status));
- return false;
- }
-
- if (!status.isOK()) {
- LOGV2(5461601,
- "Operation-fatal error for resharding while cloning config.transactions"
- " collection",
- "sourceId"_attr = _sourceId,
- "fetchTimestamp"_attr = _fetchTimestamp,
- "error"_attr = redact(status));
- }
-
- return true;
+ return status.isOK() && !chainCtx->moreToCome;
})
- .on(executor, cancelToken)
- .thenRunOn(cleanupExecutor)
- .onCompletion([this, chainCtx](Status status) {
+ .on(std::move(executor), std::move(cancelToken))
+ .thenRunOn(std::move(cleanupExecutor))
+ // It is unsafe to capture `this` once the task is running on the cleanupExecutor because
+ // RecipientStateMachine, along with its ReshardingTxnCloner member, may have already been
+ // destructed.
+ .onCompletion([chainCtx](Status status) {
if (chainCtx->pipeline) {
// Guarantee the pipeline is always cleaned up - even upon cancellation.
auto client =