diff options
Diffstat (limited to 'src/mongo/db/s/reshard_collection_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/reshard_collection_coordinator.cpp | 146 |
1 files changed, 105 insertions, 41 deletions
diff --git a/src/mongo/db/s/reshard_collection_coordinator.cpp b/src/mongo/db/s/reshard_collection_coordinator.cpp index 76d75706614..9a9b8af12ba 100644 --- a/src/mongo/db/s/reshard_collection_coordinator.cpp +++ b/src/mongo/db/s/reshard_collection_coordinator.cpp @@ -39,51 +39,115 @@ namespace mongo { -ReshardCollectionCoordinator::ReshardCollectionCoordinator( - OperationContext* opCtx, const ShardsvrReshardCollection& reshardCollectionParams) - : ShardingDDLCoordinator_NORESILIENT(opCtx, reshardCollectionParams.getCommandParameter()), - _serviceContext(opCtx->getServiceContext()), - _requestObj(reshardCollectionParams.serialize({})), - _request(ShardsvrReshardCollection::parse(IDLParserErrorContext("_shardsvrReshardCollection"), - _requestObj)), - _nss(_request.getCommandParameter()) {} - -SemiFuture<void> ReshardCollectionCoordinator::runImpl( - std::shared_ptr<executor::TaskExecutor> executor) { - return ExecutorFuture<void>(executor, Status::OK()) - .then([this, anchor = shared_from_this()]() { - ThreadClient tc("ReshardCollectionCoordinator", _serviceContext); - auto opCtxHolder = tc->makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - _forwardableOpMetadata.setOn(opCtx); - - ConfigsvrReshardCollection configsvrReshardCollection(_nss, _request.getKey()); - configsvrReshardCollection.setDbName(_request.getDbName()); - configsvrReshardCollection.setUnique(_request.getUnique()); - configsvrReshardCollection.setCollation(_request.getCollation()); - configsvrReshardCollection.set_presetReshardedChunks( - _request.get_presetReshardedChunks()); - configsvrReshardCollection.setZones(_request.getZones()); - configsvrReshardCollection.setNumInitialChunks(_request.getNumInitialChunks()); - - auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - const auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - CommandHelpers::appendMajorityWriteConcern(configsvrReshardCollection.toBSON({}), - opCtx->getWriteConcern()), - Shard::RetryPolicy::kIdempotent)); - uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(std::move(cmdResponse))); - }) +ReshardCollectionCoordinator::ReshardCollectionCoordinator(ShardingDDLCoordinatorService* service, + const BSONObj& initialState) + : ReshardCollectionCoordinator(service, initialState, true /* persistCoordinatorDocument */) {} + +ReshardCollectionCoordinator::ReshardCollectionCoordinator(ShardingDDLCoordinatorService* service, + const BSONObj& initialState, + bool persistCoordinatorDocument) + : ShardingDDLCoordinator(service, initialState), + _initialState(initialState.getOwned()), + _doc(ReshardCollectionCoordinatorDocument::parse( + IDLParserErrorContext("ReshardCollectionCoordinatorDocument"), _initialState)), + _persistCoordinatorDocument(persistCoordinatorDocument) {} + +void ReshardCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { + const auto otherDoc = ReshardCollectionCoordinatorDocument::parse( + IDLParserErrorContext("ReshardCollectionCoordinatorDocument"), doc); + + uassert(ErrorCodes::ConflictingOperationInProgress, + "Another reshard collection with different arguments is already running for the same " + "namespace", + SimpleBSONObjComparator::kInstance.evaluate( + _doc.getReshardCollectionRequest().toBSON() == + otherDoc.getReshardCollectionRequest().toBSON())); +} + +boost::optional<BSONObj> ReshardCollectionCoordinator::reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { + BSONObjBuilder cmdBob; + if (const auto& optComment = getForwardableOpMetadata().getComment()) { + cmdBob.append(optComment.get().firstElement()); + } + cmdBob.appendElements(_doc.getReshardCollectionRequest().toBSON()); + + BSONObjBuilder bob; + bob.append("type", "op"); + bob.append("desc", "ReshardCollectionCoordinator"); + bob.append("op", "command"); + bob.append("ns", nss().toString()); + bob.append("command", cmdBob.obj()); + bob.append("active", true); + return bob.obj(); +} + +void ReshardCollectionCoordinator::_enterPhase(Phase newPhase) { + if (!_persistCoordinatorDocument) { + return; + } + + StateDoc newDoc(_doc); + newDoc.setPhase(newPhase); + + LOGV2_DEBUG(6206400, + 2, + "Reshard collection coordinator phase transition", + "namespace"_attr = nss(), + "newPhase"_attr = ReshardCollectionCoordinatorPhase_serializer(newDoc.getPhase()), + "oldPhase"_attr = ReshardCollectionCoordinatorPhase_serializer(_doc.getPhase())); + + if (_doc.getPhase() == Phase::kUnset) { + _doc = _insertStateDocument(std::move(newDoc)); + return; + } + _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); +} + +ExecutorFuture<void> ReshardCollectionCoordinator::_runImpl( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept { + return ExecutorFuture<void>(**executor) + .then(_executePhase( + Phase::kReshard, + [this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + ConfigsvrReshardCollection configsvrReshardCollection(nss(), _doc.getKey()); + configsvrReshardCollection.setDbName(nss().db()); + configsvrReshardCollection.setUnique(_doc.getUnique()); + configsvrReshardCollection.setCollation(_doc.getCollation()); + configsvrReshardCollection.set_presetReshardedChunks( + _doc.get_presetReshardedChunks()); + configsvrReshardCollection.setZones(_doc.getZones()); + configsvrReshardCollection.setNumInitialChunks(_doc.getNumInitialChunks()); + + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + const auto cmdResponse = + uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern( + configsvrReshardCollection.toBSON({}), opCtx->getWriteConcern()), + Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(std::move(cmdResponse))); + })) .onError([this, anchor = shared_from_this()](const Status& status) { - LOGV2_ERROR(5276001, + LOGV2_ERROR(6206401, "Error running reshard collection", - "namespace"_attr = _nss, + "namespace"_attr = nss(), "error"_attr = redact(status)); return status; - }) - .semi(); + }); } +ReshardCollectionCoordinator_NORESILIENT::ReshardCollectionCoordinator_NORESILIENT( + ShardingDDLCoordinatorService* service, const BSONObj& initialState) + : ReshardCollectionCoordinator(service, initialState, false /* persistCoordinatorDocument */) {} + } // namespace mongo |