summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/rename_collection_coordinator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/rename_collection_coordinator.cpp')
-rw-r--r--src/mongo/db/s/rename_collection_coordinator.cpp136
1 files changed, 26 insertions, 110 deletions
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp
index 789f8ade994..64680e96cc2 100644
--- a/src/mongo/db/s/rename_collection_coordinator.cpp
+++ b/src/mongo/db/s/rename_collection_coordinator.cpp
@@ -90,9 +90,7 @@ boost::optional<UUID> getCollectionUUID(OperationContext* opCtx,
RenameCollectionCoordinator::RenameCollectionCoordinator(ShardingDDLCoordinatorService* service,
const BSONObj& initialState)
- : ShardingDDLCoordinator(service, initialState),
- _doc(RenameCollectionCoordinatorDocument::parse(
- IDLParserErrorContext("RenameCollectionCoordinatorDocument"), initialState)),
+ : RecoverableShardingDDLCoordinator(service, "RenameCollectionCoordinator", initialState),
_request(_doc.getRenameCollectionRequest()) {}
void RenameCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
@@ -113,54 +111,8 @@ std::vector<StringData> RenameCollectionCoordinator::_acquireAdditionalLocks(
return {_request.getTo().ns()};
}
-boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode connMode,
- MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
-
- BSONObjBuilder cmdBob;
- if (const auto& optComment = getForwardableOpMetadata().getComment()) {
- cmdBob.append(optComment.get().firstElement());
- }
- cmdBob.appendElements(_request.toBSON());
-
- const auto currPhase = [&]() {
- stdx::lock_guard l{_docMutex};
- return _doc.getPhase();
- }();
-
- BSONObjBuilder bob;
- bob.append("type", "op");
- bob.append("desc", "RenameCollectionCoordinator");
- bob.append("op", "command");
- bob.append("ns", nss().toString());
- bob.append("command", cmdBob.obj());
- bob.append("currentPhase", currPhase);
- bob.append("active", true);
- return bob.obj();
-}
-
-void RenameCollectionCoordinator::_enterPhase(Phase newPhase) {
- StateDoc newDoc(_doc);
- newDoc.setPhase(newPhase);
-
- LOGV2_DEBUG(5460501,
- 2,
- "Rename collection coordinator phase transition",
- "fromNs"_attr = nss(),
- "toNs"_attr = _request.getTo(),
- "newPhase"_attr = RenameCollectionCoordinatorPhase_serializer(newDoc.getPhase()),
- "oldPhase"_attr = RenameCollectionCoordinatorPhase_serializer(_doc.getPhase()));
-
- if (_doc.getPhase() == Phase::kUnset) {
- newDoc = _insertStateDocument(std::move(newDoc));
- } else {
- newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
- }
-
- {
- stdx::unique_lock ul{_docMutex};
- _doc = std::move(newDoc);
- }
+void RenameCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const {
+ cmdInfoBuilder->appendElements(_request.toBSON());
}
ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
@@ -275,15 +227,15 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
getForwardableOpMetadata().setOn(opCtx);
if (!_firstExecution) {
- _doc = _updateSession(opCtx, _doc);
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(_doc), **executor);
+ opCtx, getCurrentSession(), **executor);
}
const auto& fromNss = nss();
- _doc = _updateSession(opCtx, _doc);
- const OperationSessionInfo osi = getCurrentSession(_doc);
+ _updateSession(opCtx);
+ const OperationSessionInfo osi = getCurrentSession();
// On participant shards:
// - Block CRUD on source and target collection in case at least one
@@ -303,20 +255,8 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(
renameCollParticipantRequest.toBSON({}));
- try {
- sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- fromNss.db(),
- cmdObj.addFields(osi.toBSON()),
- participants,
- **executor);
-
- } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) {
- // Older 5.0 binaries don't support running the command as a
- // retryable write yet. In that case, retry without attaching session info.
- sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx, fromNss.db(), cmdObj, participants, **executor);
- }
+ sharding_ddl_util::sendAuthenticatedCommandToShards(
+ opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor);
}))
.then(_executePhase(
Phase::kRenameMetadata,
@@ -325,10 +265,13 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ // For an unsharded collection the CSRS server can not verify the targetUUID.
+ // Use the session ID + txnNumber to ensure no stale requests get through.
+ _updateSession(opCtx);
+
if (!_firstExecution) {
- _doc = _updateSession(opCtx, _doc);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(_doc), **executor);
+ opCtx, getCurrentSession(), **executor);
}
ConfigsvrRenameCollectionMetadata req(nss(), _request.getTo());
@@ -336,28 +279,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(req.toBSON({}));
const auto& configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- // For an unsharded collection the CSRS server can not verify the targetUUID.
- // Use the session ID + txnNumber to ensure no stale requests get through.
- _doc = _updateSession(opCtx, _doc);
- const OperationSessionInfo osi = getCurrentSession(_doc);
-
- try {
- uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(
- configShard->runCommand(opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- cmdObj.addFields(osi.toBSON()),
- Shard::RetryPolicy::kIdempotent)));
- } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) {
- // Older 5.0 binaries don't support running the command as a
- // retryable write yet. In that case, retry without attaching session info.
- uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(
- configShard->runCommand(opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- "admin",
- cmdObj,
- Shard::RetryPolicy::kIdempotent)));
- }
+ uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(
+ configShard->runCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ cmdObj.addFields(getCurrentSession().toBSON()),
+ Shard::RetryPolicy::kIdempotent)));
}))
.then(_executePhase(
Phase::kUnblockCRUD,
@@ -367,9 +294,9 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
getForwardableOpMetadata().setOn(opCtx);
if (!_firstExecution) {
- _doc = _updateSession(opCtx, _doc);
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(_doc), **executor);
+ opCtx, getCurrentSession(), **executor);
}
const auto& fromNss = nss();
@@ -383,22 +310,11 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl(
unblockParticipantRequest.toBSON({}));
auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
- _doc = _updateSession(opCtx, _doc);
- const OperationSessionInfo osi = getCurrentSession(_doc);
+ _updateSession(opCtx);
+ const OperationSessionInfo osi = getCurrentSession();
- try {
- sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- fromNss.db(),
- cmdObj.addFields(osi.toBSON()),
- participants,
- **executor);
- } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) {
- // Older 5.0 binaries don't support running the command as a
- // retryable write yet. In that case, retry without attaching session info.
- sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx, fromNss.db(), cmdObj, participants, **executor);
- }
+ sharding_ddl_util::sendAuthenticatedCommandToShards(
+ opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor);
}))
.then(_executePhase(Phase::kSetResponse,
[this, anchor = shared_from_this()] {