diff options
author | Marcos José Grillo Ramirez <marcos.grillo@mongodb.com> | 2022-01-13 13:54:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-17 14:16:03 +0000 |
commit | c7db7debed3584e36f8439ee195a489d58412383 (patch) | |
tree | 134ca2093286cf3d59248908c398fca0fd6faeb2 | |
parent | 7c1f9a4aaad7a8e58fd8cbf9bffd7465ed84feef (diff) | |
download | mongo-c7db7debed3584e36f8439ee195a489d58412383.tar.gz |
SERVER-58622 Retry removing the DDL coordinator document after the DDL is finished unless there is a stepdown
(cherry picked from commit 5fa70b4e4d6b4252fd505ab12cea771b197d2cf0)
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.h | 1 |
4 files changed, 49 insertions, 16 deletions
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index bde0bba3a4c..c743ec7cc6b 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -72,6 +72,26 @@ ShardingDDLCoordinator::~ShardingDDLCoordinator() { invariant(_completionPromise.getFuture().isReady()); } +ExecutorFuture<bool> ShardingDDLCoordinator::_removeDocumentUntillSuccessOrStepdown( + std::shared_ptr<executor::TaskExecutor> executor) { + return AsyncTry([this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + + return StatusWith(_removeDocument(opCtx)); + }) + .until([this](const StatusWith<bool>& sw) { + // We can't rely on the instance token because after removing the document the + // CancellationSource object of the instance is lost, so the reference to the parent POS + // token is also lost, making any subsequent cancel during a stepdown unnoticeable by + // the token. + return sw.isOK() || sw.getStatus().isA<ErrorCategory::NotPrimaryError>() || + sw.getStatus().isA<ErrorCategory::ShutdownError>(); + }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(executor, CancellationToken::uncancelable()); +} + bool ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) { DBDirectClient dbClient(opCtx); auto commandResponse = dbClient.runCommand([&] { @@ -149,7 +169,6 @@ void ShardingDDLCoordinator::interrupt(Status status) { SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { - return ExecutorFuture<void>(**executor) .then([this, executor, token, anchor = shared_from_this()] { return _acquireLockAsync(executor, token, nss().db()); @@ -237,7 +256,7 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancellationToken::uncancelable()); }) - .onCompletion([this, anchor = shared_from_this()](const Status& status) { + .onCompletion([this, executor, token, anchor = shared_from_this()](const Status& status) { auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); @@ -249,16 +268,20 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas // Release the coordinator only in case the node is not stepping down or in case of // acceptable error if (!isSteppingDown || (!status.isOK() && _completeOnError)) { - try { - LOGV2(5565601, - "Releasing sharding DDL coordinator", - "coordinatorId"_attr = _coordId); + LOGV2( + 5565601, "Releasing sharding DDL coordinator", "coordinatorId"_attr = _coordId); + + auto session = metadata().getSession(); - auto session = metadata().getSession(); - const auto docWasRemoved = _removeDocument(opCtx); + try { + // We need to execute this in another executor to ensure the remove work is + // done. + const auto docWasRemoved = _removeDocumentUntillSuccessOrStepdown( + _service->getInstanceCleanupExecutor()) + .get(); if (!docWasRemoved) { - // Release the instance without interrupting it + // Release the instance without interrupting it. _service->releaseInstance( BSON(ShardingDDLCoordinatorMetadata::kIdFieldName << _coordId.toBSON()), Status::OK()); @@ -269,13 +292,12 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas // discarded. SessionCache::get(opCtx)->release(*session); } - } catch (DBException& ex) { - static constexpr auto errMsg = "Failed to release sharding DDL coordinator"; - LOGV2_WARNING(5565605, - errMsg, - "coordinatorId"_attr = _coordId, - "error"_attr = redact(ex)); - completionStatus = ex.toStatus(errMsg); + } catch (const DBException& ex) { + completionStatus = ex.toStatus(); + isSteppingDown = completionStatus.isA<ErrorCategory::NotPrimaryError>() || + completionStatus.isA<ErrorCategory::ShutdownError>() || + completionStatus.isA<ErrorCategory::CancellationError>(); + dassert(isSteppingDown); } } diff --git a/src/mongo/db/s/sharding_ddl_coordinator.h b/src/mongo/db/s/sharding_ddl_coordinator.h index d554baa0426..e549a13f2a4 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.h +++ b/src/mongo/db/s/sharding_ddl_coordinator.h @@ -204,6 +204,10 @@ private: void interrupt(Status status) override final; bool _removeDocument(OperationContext* opCtx); + + ExecutorFuture<bool> _removeDocumentUntillSuccessOrStepdown( + std::shared_ptr<executor::TaskExecutor> executor); + ExecutorFuture<void> _acquireLockAsync(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token, StringData resource); diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp index 208dfa58dd9..1070dd84b1a 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp @@ -246,4 +246,10 @@ ShardingDDLCoordinatorService::getOrCreateInstance(OperationContext* opCtx, BSON return coordinator; } + +std::shared_ptr<executor::TaskExecutor> ShardingDDLCoordinatorService::getInstanceCleanupExecutor() + const { + return PrimaryOnlyService::getInstanceCleanupExecutor(); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.h b/src/mongo/db/s/sharding_ddl_coordinator_service.h index 3f0163ae416..b665fe2d367 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.h +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.h @@ -65,6 +65,7 @@ public: // TODO SERVER-53283 remove the following function after 5.0 became last LTS void waitForAllCoordinatorsToComplete(OperationContext* opCtx) const; + std::shared_ptr<executor::TaskExecutor> getInstanceCleanupExecutor() const; private: ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor, |