diff options
Diffstat (limited to 'src/mongo/db/s/sharding_ddl_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.cpp | 95 |
1 files changed, 66 insertions, 29 deletions
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index 2b112ee4b4e..5e05dae0a09 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -33,12 +33,15 @@ #include "mongo/db/s/sharding_ddl_coordinator.h" -#include "mongo/db/persistent_task_store.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_ddl_coordinator_gen.h" +#include "mongo/db/write_concern.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" +#include "mongo/s/write_ops/batched_command_response.h" namespace mongo { @@ -47,8 +50,10 @@ ShardingDDLCoordinatorMetadata extractShardingDDLCoordinatorMetadata(const BSONO IDLParserErrorContext("ShardingDDLCoordinatorMetadata"), coorDoc); } -ShardingDDLCoordinator::ShardingDDLCoordinator(const BSONObj& coorDoc) - : _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)), +ShardingDDLCoordinator::ShardingDDLCoordinator(ShardingDDLCoordinatorService* service, + const BSONObj& coorDoc) + : _service(service), + _coorMetadata(extractShardingDDLCoordinatorMetadata(coorDoc)), _recoveredFromDisk(_coorMetadata.getRecoveredFromDisk()) {} ShardingDDLCoordinator::~ShardingDDLCoordinator() { @@ -56,17 +61,39 @@ ShardingDDLCoordinator::~ShardingDDLCoordinator() { invariant(_completionPromise.getFuture().isReady()); } -void ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) { - PersistentTaskStore<ShardingDDLCoordinatorMetadata> store( - NamespaceString::kShardingDDLCoordinatorsNamespace); - LOGV2_DEBUG(5565601, - 2, - "Removing sharding DDL coordinator document", - "coordinatorId"_attr = _coorMetadata.getId()); - store.remove( - opCtx, - BSON(ShardingDDLCoordinatorMetadata::kIdFieldName << _coorMetadata.getId().toBSON()), - WriteConcerns::kMajorityWriteConcern); +bool ShardingDDLCoordinator::_removeDocument(OperationContext* opCtx) { + DBDirectClient dbClient(opCtx); + auto commandResponse = dbClient.runCommand([&] { + write_ops::DeleteCommandRequest deleteOp( + NamespaceString::kShardingDDLCoordinatorsNamespace); + + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName + << _coorMetadata.getId().toBSON())); + entry.setMulti(true); + return entry; + }()}); + + return deleteOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + BatchedCommandResponse batchedResponse; + std::string unusedErrmsg; + batchedResponse.parseBSON(commandReply, &unusedErrmsg); + + WriteConcernResult ignoreResult; + const WriteConcernOptions majorityWriteConcern{ + WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kWriteConcernTimeoutSharding}; + auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + uassertStatusOK(waitForWriteConcern(opCtx, latestOpTime, majorityWriteConcern, &ignoreResult)); + + return batchedResponse.getN() > 0; } void ShardingDDLCoordinator::interrupt(Status status) { @@ -134,7 +161,12 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas errorMsg, "coordinatorId"_attr = _coorMetadata.getId(), "reason"_attr = redact(status)); - interrupt(status.withContext(errorMsg)); + + stdx::lock_guard<Latch> lg(_mutex); + if (!_constructionCompletionPromise.getFuture().isReady()) { + _constructionCompletionPromise.setError(status); + } + return status; }) .then([this, executor, token, anchor = shared_from_this()] { @@ -144,28 +176,33 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); - const auto completionStatus = [&] { - if (!status.isOK() && - (status.isA<ErrorCategory::NotPrimaryError>() || - status.isA<ErrorCategory::ShutdownError>())) { - // Do not remove the coordinator document - // if we had a stepdown related error. - return status; - } + auto completionStatus = status; + + // Release the coordinator only if we are not stepping down + if (!status.isA<ErrorCategory::NotPrimaryError>() && + !status.isA<ErrorCategory::ShutdownError>()) { try { - _removeDocument(opCtx); - return status; + LOGV2(5565601, + "Releasing sharding DDL coordinator", + "coordinatorId"_attr = _coorMetadata.getId()); + + const auto docWasRemoved = _removeDocument(opCtx); + + if (!docWasRemoved) { + _service->releaseInstance(BSON(ShardingDDLCoordinatorMetadata::kIdFieldName + << _coorMetadata.getId().toBSON()), + status); + } } catch (DBException& ex) { - static constexpr auto& errMsg = - "Failed to remove sharding DDL coordinator document"; + static constexpr auto errMsg = "Failed to release sharding DDL coordinator"; LOGV2_WARNING(5565605, errMsg, "coordinatorId"_attr = _coorMetadata.getId(), "error"_attr = redact(ex)); - return ex.toStatus(errMsg); + completionStatus = ex.toStatus(errMsg); } - }(); + } while (!_scopedLocks.empty()) { _scopedLocks.top().assignNewOpCtx(opCtx); |