summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/rename_collection_participant_service.cpp86
-rw-r--r--src/mongo/db/s/rename_collection_participant_service.h3
2 files changed, 54 insertions, 35 deletions
diff --git a/src/mongo/db/s/rename_collection_participant_service.cpp b/src/mongo/db/s/rename_collection_participant_service.cpp
index 62da2326850..4d9efb66056 100644
--- a/src/mongo/db/s/rename_collection_participant_service.cpp
+++ b/src/mongo/db/s/rename_collection_participant_service.cpp
@@ -46,11 +46,14 @@
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/grid.h"
+#include "mongo/util/future_util.h"
namespace mongo {
namespace {
+const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
+
/*
* Drop the collection locally and clear stale metadata from cache collections.
*/
@@ -244,6 +247,47 @@ SemiFuture<void> RenameParticipantInstance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
+ .then([this, executor, token, anchor = shared_from_this()] {
+ return AsyncTry([this, executor, token] { return _runImpl(executor, token); })
+ .until([this, token](Status status) { return status.isOK() || token.isCanceled(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, CancellationToken::uncancelable());
+ })
+ .onCompletion([this, anchor = shared_from_this()](const Status& status) {
+ if (!status.isOK()) {
+ // The token gets canceled in case of stepdown/shutdown
+ _invalidateFutures(status);
+ return;
+ }
+
+ try {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ _removeStateDocument(opCtx);
+ } catch (DBException& ex) {
+ LOGV2_WARNING(5515108,
+ "Failed to remove rename participant state document",
+ "error"_attr = redact(ex));
+ ex.addContext("Failed to remove rename participant state document"_sd);
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_unblockCRUDPromise.getFuture().isReady()) {
+ _unblockCRUDPromise.setError(ex.toStatus());
+ }
+ throw;
+ }
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_unblockCRUDPromise.getFuture().isReady()) {
+ _unblockCRUDPromise.emplaceValue();
+ }
+ })
+ .semi();
+}
+
+SemiFuture<void> RenameParticipantInstance::_runImpl(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
.then(_executePhase(
Phase::kBlockCRUDAndSnapshotRangeDeletions,
[this, anchor = shared_from_this()] {
@@ -347,41 +391,13 @@ SemiFuture<void> RenameParticipantInstance::run(
LOGV2(5515107, "CRUD unblocked", "fromNs"_attr = fromNss(), "toNs"_attr = toNss());
}))
- .onCompletion([this, anchor = shared_from_this()](const Status& status) {
- if (!status.isOK()) {
- if (!status.isA<ErrorCategory::NotPrimaryError>() &&
- !status.isA<ErrorCategory::ShutdownError>()) {
- LOGV2_ERROR(5515109,
- "Error executing rename collection participant",
- "fromNs"_attr = fromNss(),
- "toNs"_attr = toNss(),
- "error"_attr = redact(status));
- }
-
- _invalidateFutures(status);
- return;
- }
-
- try {
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- _removeStateDocument(opCtx);
- } catch (DBException& ex) {
- LOGV2_WARNING(5515108,
- "Failed to remove rename participant state document",
- "error"_attr = redact(ex));
- ex.addContext("Failed to remove rename participant state document"_sd);
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_unblockCRUDPromise.getFuture().isReady()) {
- _unblockCRUDPromise.setError(ex.toStatus());
- }
- throw;
- }
-
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_unblockCRUDPromise.getFuture().isReady()) {
- _unblockCRUDPromise.emplaceValue();
- }
+ .onError([this, anchor = shared_from_this()](const Status& status) {
+ LOGV2_ERROR(6372200,
+ "Error executing rename collection participant. Going to be retried.",
+ "fromNs"_attr = fromNss(),
+ "toNs"_attr = toNss(),
+ "error"_attr = redact(status));
+ return status;
})
.semi();
}
diff --git a/src/mongo/db/s/rename_collection_participant_service.h b/src/mongo/db/s/rename_collection_participant_service.h
index 729f484ac66..965bd93e332 100644
--- a/src/mongo/db/s/rename_collection_participant_service.h
+++ b/src/mongo/db/s/rename_collection_participant_service.h
@@ -142,6 +142,9 @@ private:
SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override final;
+ SemiFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept;
+
void interrupt(Status status) noexcept override final;
template <typename Func>