diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/rename_collection_participant_service.cpp | 86 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_participant_service.h | 3 |
2 files changed, 54 insertions, 35 deletions
diff --git a/src/mongo/db/s/rename_collection_participant_service.cpp b/src/mongo/db/s/rename_collection_participant_service.cpp index 62da2326850..4d9efb66056 100644 --- a/src/mongo/db/s/rename_collection_participant_service.cpp +++ b/src/mongo/db/s/rename_collection_participant_service.cpp @@ -46,11 +46,14 @@ #include "mongo/logv2/log.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/grid.h" +#include "mongo/util/future_util.h" namespace mongo { namespace { +const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); + /* * Drop the collection locally and clear stale metadata from cache collections. */ @@ -244,6 +247,47 @@ SemiFuture<void> RenameParticipantInstance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { return ExecutorFuture<void>(**executor) + .then([this, executor, token, anchor = shared_from_this()] { + return AsyncTry([this, executor, token] { return _runImpl(executor, token); }) + .until([this, token](Status status) { return status.isOK() || token.isCanceled(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, CancellationToken::uncancelable()); + }) + .onCompletion([this, anchor = shared_from_this()](const Status& status) { + if (!status.isOK()) { + // The token gets canceled in case of stepdown/shutdown + _invalidateFutures(status); + return; + } + + try { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + _removeStateDocument(opCtx); + } catch (DBException& ex) { + LOGV2_WARNING(5515108, + "Failed to remove rename participant state document", + "error"_attr = redact(ex)); + ex.addContext("Failed to remove rename participant state document"_sd); + stdx::lock_guard<Latch> lg(_mutex); + if (!_unblockCRUDPromise.getFuture().isReady()) { + _unblockCRUDPromise.setError(ex.toStatus()); + } + throw; + } + + stdx::lock_guard<Latch> lg(_mutex); + if (!_unblockCRUDPromise.getFuture().isReady()) { + _unblockCRUDPromise.emplaceValue(); + } + }) + .semi(); +} + +SemiFuture<void> RenameParticipantInstance::_runImpl( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept { + return ExecutorFuture<void>(**executor) .then(_executePhase( Phase::kBlockCRUDAndSnapshotRangeDeletions, [this, anchor = shared_from_this()] { @@ -347,41 +391,13 @@ SemiFuture<void> RenameParticipantInstance::run( LOGV2(5515107, "CRUD unblocked", "fromNs"_attr = fromNss(), "toNs"_attr = toNss()); })) - .onCompletion([this, anchor = shared_from_this()](const Status& status) { - if (!status.isOK()) { - if (!status.isA<ErrorCategory::NotPrimaryError>() && - !status.isA<ErrorCategory::ShutdownError>()) { - LOGV2_ERROR(5515109, - "Error executing rename collection participant", - "fromNs"_attr = fromNss(), - "toNs"_attr = toNss(), - "error"_attr = redact(status)); - } - - _invalidateFutures(status); - return; - } - - try { - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - _removeStateDocument(opCtx); - } catch (DBException& ex) { - LOGV2_WARNING(5515108, - "Failed to remove rename participant state document", - "error"_attr = redact(ex)); - ex.addContext("Failed to remove rename participant state document"_sd); - stdx::lock_guard<Latch> lg(_mutex); - if (!_unblockCRUDPromise.getFuture().isReady()) { - _unblockCRUDPromise.setError(ex.toStatus()); - } - throw; - } - - stdx::lock_guard<Latch> lg(_mutex); - if (!_unblockCRUDPromise.getFuture().isReady()) { - _unblockCRUDPromise.emplaceValue(); - } + .onError([this, anchor = shared_from_this()](const Status& status) { + LOGV2_ERROR(6372200, + "Error executing rename collection participant. Going to be retried.", + "fromNs"_attr = fromNss(), + "toNs"_attr = toNss(), + "error"_attr = redact(status)); + return status; }) .semi(); } diff --git a/src/mongo/db/s/rename_collection_participant_service.h b/src/mongo/db/s/rename_collection_participant_service.h index 729f484ac66..965bd93e332 100644 --- a/src/mongo/db/s/rename_collection_participant_service.h +++ b/src/mongo/db/s/rename_collection_participant_service.h @@ -142,6 +142,9 @@ private: SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override final; + SemiFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept; + void interrupt(Status status) noexcept override final; template <typename Func> |