diff options
Diffstat (limited to 'src/mongo/db/s/rename_collection_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 136 |
1 files changed, 26 insertions, 110 deletions
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 789f8ade994..64680e96cc2 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -90,9 +90,7 @@ boost::optional<UUID> getCollectionUUID(OperationContext* opCtx, RenameCollectionCoordinator::RenameCollectionCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState) - : ShardingDDLCoordinator(service, initialState), - _doc(RenameCollectionCoordinatorDocument::parse( - IDLParserErrorContext("RenameCollectionCoordinatorDocument"), initialState)), + : RecoverableShardingDDLCoordinator(service, "RenameCollectionCoordinator", initialState), _request(_doc.getRenameCollectionRequest()) {} void RenameCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { @@ -113,54 +111,8 @@ std::vector<StringData> RenameCollectionCoordinator::_acquireAdditionalLocks( return {_request.getTo().ns()}; } -boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp( - MongoProcessInterface::CurrentOpConnectionsMode connMode, - MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { - - BSONObjBuilder cmdBob; - if (const auto& optComment = getForwardableOpMetadata().getComment()) { - cmdBob.append(optComment.get().firstElement()); - } - cmdBob.appendElements(_request.toBSON()); - - const auto currPhase = [&]() { - stdx::lock_guard l{_docMutex}; - return _doc.getPhase(); - }(); - - BSONObjBuilder bob; - bob.append("type", "op"); - bob.append("desc", "RenameCollectionCoordinator"); - bob.append("op", "command"); - bob.append("ns", nss().toString()); - bob.append("command", cmdBob.obj()); - bob.append("currentPhase", currPhase); - bob.append("active", true); - return bob.obj(); -} - -void RenameCollectionCoordinator::_enterPhase(Phase newPhase) { - StateDoc newDoc(_doc); - newDoc.setPhase(newPhase); - - LOGV2_DEBUG(5460501, - 2, - "Rename collection coordinator phase transition", - "fromNs"_attr = nss(), - "toNs"_attr = _request.getTo(), - "newPhase"_attr = RenameCollectionCoordinatorPhase_serializer(newDoc.getPhase()), - "oldPhase"_attr = RenameCollectionCoordinatorPhase_serializer(_doc.getPhase())); - - if (_doc.getPhase() == Phase::kUnset) { - newDoc = _insertStateDocument(std::move(newDoc)); - } else { - newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); - } - - { - stdx::unique_lock ul{_docMutex}; - _doc = std::move(newDoc); - } +void RenameCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const { + cmdInfoBuilder->appendElements(_request.toBSON()); } ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( @@ -275,15 +227,15 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); if (!_firstExecution) { - _doc = _updateSession(opCtx, _doc); + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(_doc), **executor); + opCtx, getCurrentSession(), **executor); } const auto& fromNss = nss(); - _doc = _updateSession(opCtx, _doc); - const OperationSessionInfo osi = getCurrentSession(_doc); + _updateSession(opCtx); + const OperationSessionInfo osi = getCurrentSession(); // On participant shards: // - Block CRUD on source and target collection in case at least one @@ -303,20 +255,8 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( const auto cmdObj = CommandHelpers::appendMajorityWriteConcern( renameCollParticipantRequest.toBSON({})); - try { - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - fromNss.db(), - cmdObj.addFields(osi.toBSON()), - participants, - **executor); - - } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) { - // Older 5.0 binaries don't support running the command as a - // retryable write yet. In that case, retry without attaching session info. - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, fromNss.db(), cmdObj, participants, **executor); - } + sharding_ddl_util::sendAuthenticatedCommandToShards( + opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor); })) .then(_executePhase( Phase::kRenameMetadata, @@ -325,10 +265,13 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + // For an unsharded collection the CSRS server can not verify the targetUUID. + // Use the session ID + txnNumber to ensure no stale requests get through. + _updateSession(opCtx); + if (!_firstExecution) { - _doc = _updateSession(opCtx, _doc); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(_doc), **executor); + opCtx, getCurrentSession(), **executor); } ConfigsvrRenameCollectionMetadata req(nss(), _request.getTo()); @@ -336,28 +279,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(req.toBSON({})); const auto& configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - // For an unsharded collection the CSRS server can not verify the targetUUID. - // Use the session ID + txnNumber to ensure no stale requests get through. - _doc = _updateSession(opCtx, _doc); - const OperationSessionInfo osi = getCurrentSession(_doc); - - try { - uassertStatusOK(Shard::CommandResponse::getEffectiveStatus( - configShard->runCommand(opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - cmdObj.addFields(osi.toBSON()), - Shard::RetryPolicy::kIdempotent))); - } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) { - // Older 5.0 binaries don't support running the command as a - // retryable write yet. In that case, retry without attaching session info. - uassertStatusOK(Shard::CommandResponse::getEffectiveStatus( - configShard->runCommand(opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - cmdObj, - Shard::RetryPolicy::kIdempotent))); - } + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus( + configShard->runCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + "admin", + cmdObj.addFields(getCurrentSession().toBSON()), + Shard::RetryPolicy::kIdempotent))); })) .then(_executePhase( Phase::kUnblockCRUD, @@ -367,9 +294,9 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); if (!_firstExecution) { - _doc = _updateSession(opCtx, _doc); + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(_doc), **executor); + opCtx, getCurrentSession(), **executor); } const auto& fromNss = nss(); @@ -383,22 +310,11 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( unblockParticipantRequest.toBSON({})); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); - _doc = _updateSession(opCtx, _doc); - const OperationSessionInfo osi = getCurrentSession(_doc); + _updateSession(opCtx); + const OperationSessionInfo osi = getCurrentSession(); - try { - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - fromNss.db(), - cmdObj.addFields(osi.toBSON()), - participants, - **executor); - } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) { - // Older 5.0 binaries don't support running the command as a - // retryable write yet. In that case, retry without attaching session info. - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, fromNss.db(), cmdObj, participants, **executor); - } + sharding_ddl_util::sendAuthenticatedCommandToShards( + opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor); })) .then(_executePhase(Phase::kSetResponse, [this, anchor = shared_from_this()] { |