summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/create_collection_coordinator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/create_collection_coordinator.cpp')
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp148
1 files changed, 23 insertions, 125 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index a5b499cfe81..ccbad667d35 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -359,39 +359,8 @@ void broadcastDropCollection(OperationContext* opCtx,
} // namespace
-CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorService* service,
- const BSONObj& initialState)
- : ShardingDDLCoordinator(service, initialState),
- _doc(CreateCollectionCoordinatorDocument::parse(
- IDLParserErrorContext("CreateCollectionCoordinatorDocument"), initialState)),
- _request(_doc.getCreateCollectionRequest()),
- _critSecReason(BSON("command"
- << "createCollection"
- << "ns" << nss().toString())) {}
-
-boost::optional<BSONObj> CreateCollectionCoordinator::reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode connMode,
- MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
- BSONObjBuilder cmdBob;
- if (const auto& optComment = getForwardableOpMetadata().getComment()) {
- cmdBob.append(optComment.get().firstElement());
- }
- cmdBob.appendElements(_request.toBSON());
-
- const auto currPhase = [&]() {
- stdx::lock_guard l{_docMutex};
- return _doc.getPhase();
- }();
-
- BSONObjBuilder bob;
- bob.append("type", "op");
- bob.append("desc", "CreateCollectionCoordinator");
- bob.append("op", "command");
- bob.append("ns", nss().toString());
- bob.append("command", cmdBob.obj());
- bob.append("currentPhase", currPhase);
- bob.append("active", true);
- return bob.obj();
+void CreateCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const {
+ cmdInfoBuilder->appendElements(_request.toBSON());
}
void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const {
@@ -435,9 +404,9 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// Additionally we want to perform a majority write on the CSRS to ensure that
// all the subsequent reads will see all the writes performed from a previous
// execution of this coordinator.
- _doc = _updateSession(opCtx, _doc);
+ _updateSession(opCtx);
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
- opCtx, getCurrentSession(_doc), **executor);
+ opCtx, getCurrentSession(), **executor);
}
// Log the start of the event only if we're not recovering.
@@ -461,7 +430,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
->releaseRecoverableCriticalSection(
opCtx,
nss(),
- _getCriticalSectionReason(),
+ _critSecReason,
ShardingCatalogClient::kMajorityWriteConcern);
_result = createCollectionResponseOpt;
@@ -474,10 +443,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// presence of a stepdown.
RecoverableCriticalSectionService::get(opCtx)
->acquireRecoverableCriticalSectionBlockWrites(
- opCtx,
- nss(),
- _getCriticalSectionReason(),
- ShardingCatalogClient::kMajorityWriteConcern);
+ opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
if (!_firstExecution) {
auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss());
@@ -489,12 +455,11 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
"Removing partial changes from previous run",
"namespace"_attr = nss());
- _doc = _updateSession(opCtx, _doc);
- cleanupPartialChunksFromPreviousAttempt(
- opCtx, *uuid, getCurrentSession(_doc));
+ _updateSession(opCtx);
+ cleanupPartialChunksFromPreviousAttempt(opCtx, *uuid, getCurrentSession());
- _doc = _updateSession(opCtx, _doc);
- broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession(_doc));
+ _updateSession(opCtx);
+ broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession());
}
}
@@ -517,28 +482,18 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
->promoteRecoverableCriticalSectionToBlockAlsoReads(
opCtx,
nss(),
- _getCriticalSectionReason(),
+ _critSecReason,
ShardingCatalogClient::kMajorityWriteConcern);
- _doc = _updateSession(opCtx, _doc);
- try {
- _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession(_doc));
- } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) {
- // Older 5.0 binaries don't support running the
- // _shardsvrCreateCollectionParticipant command as a retryable write yet. In
- // that case, retry without attaching session info.
- _createCollectionOnNonPrimaryShards(opCtx, boost::none);
- }
+ _updateSession(opCtx);
+ _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession());
_commit(opCtx);
}
// End of the critical section, from now on, read and writes are permitted.
RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx,
- nss(),
- _getCriticalSectionReason(),
- ShardingCatalogClient::kMajorityWriteConcern);
+ opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
// Slow path. Create chunks (which might incur in an index scan) and commit must be
// done outside of the critical section to prevent writes from stalling in unsharded
@@ -566,10 +521,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection(
- opCtx,
- nss(),
- _getCriticalSectionReason(),
- ShardingCatalogClient::kMajorityWriteConcern);
+ opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern);
}
return status;
});
@@ -751,7 +703,7 @@ void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) {
}
void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(
- OperationContext* opCtx, const boost::optional<OperationSessionInfo>& osi) {
+ OperationContext* opCtx, const OperationSessionInfo& osi) {
LOGV2_DEBUG(5277905,
2,
"Create collection _createCollectionOnNonPrimaryShards",
@@ -778,10 +730,9 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(
createCollectionParticipantRequest.setIdIndex(idIndex);
createCollectionParticipantRequest.setIndexes(indexes);
- requests.emplace_back(
- chunkShardId,
- CommandHelpers::appendMajorityWriteConcern(
- createCollectionParticipantRequest.toBSON(osi ? osi->toBSON() : BSONObj())));
+ requests.emplace_back(chunkShardId,
+ CommandHelpers::appendMajorityWriteConcern(
+ createCollectionParticipantRequest.toBSON(osi.toBSON())));
initializedShards.emplace(chunkShardId);
}
@@ -817,8 +768,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss());
// Upsert Chunks.
- _doc = _updateSession(opCtx, _doc);
- insertChunks(opCtx, _initialChunks->chunks, getCurrentSession(_doc));
+ _updateSession(opCtx);
+ insertChunks(opCtx, _initialChunks->chunks, getCurrentSession());
CollectionType coll(nss(),
_initialChunks->collVersion().epoch(),
@@ -841,9 +792,9 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
coll.setUnique(*_request.getUnique());
}
- _doc = _updateSession(opCtx, _doc);
+ _updateSession(opCtx);
try {
- insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc));
+ insertCollectionEntry(opCtx, nss(), coll, getCurrentSession());
notifyChangeStreamsOnShardCollection(opCtx, nss(), *_collectionUUID, _request.toBSON());
@@ -927,57 +878,4 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt
opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj());
}
-// Phase change API.
-
-void CreateCollectionCoordinator::_enterPhase(Phase newPhase) {
- CoordDoc newDoc(_doc);
- newDoc.setPhase(newPhase);
-
- LOGV2_DEBUG(5565600,
- 2,
- "Create collection coordinator phase transition",
- "namespace"_attr = nss(),
- "newPhase"_attr = CreateCollectionCoordinatorPhase_serializer(newDoc.getPhase()),
- "oldPhase"_attr = CreateCollectionCoordinatorPhase_serializer(_doc.getPhase()));
-
- if (_doc.getPhase() == Phase::kUnset) {
- newDoc = _insertStateDocument(std::move(newDoc));
- } else {
- newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc));
- }
-
- {
- stdx::unique_lock ul{_docMutex};
- _doc = std::move(newDoc);
- }
-}
-
-const BSONObj CreateCollectionCoordinatorDocumentPre60Compatible::kPre60IncompatibleFields =
- BSON(CreateCollectionRequest::kCollectionUUIDFieldName
- << 1 << CreateCollectionRequest::kImplicitlyCreateIndexFieldName << 1
- << CreateCollectionRequest::kEnforceUniquenessCheckFieldName << 1);
-
-void CreateCollectionCoordinatorDocumentPre60Compatible::serialize(BSONObjBuilder* builder) const {
- BSONObjBuilder internalBuilder;
- CreateCollectionCoordinatorDocument::serialize(&internalBuilder);
- internalBuilder.asTempObj().filterFieldsUndotted(builder, kPre60IncompatibleFields, false);
-}
-
-BSONObj CreateCollectionCoordinatorDocumentPre60Compatible::toBSON() const {
- BSONObjBuilder builder;
- serialize(&builder);
- return builder.obj();
-}
-
-CreateCollectionCoordinatorPre60Compatible::CreateCollectionCoordinatorPre60Compatible(
- ShardingDDLCoordinatorService* service, const BSONObj& initialState)
- : CreateCollectionCoordinator(service, initialState),
- _critSecReason(
- BSON("command"
- << "createCollection"
- << "ns" << nss().toString() << "request"
- << _request.toBSON().filterFieldsUndotted(
- CreateCollectionCoordinatorDocumentPre60Compatible::kPre60IncompatibleFields,
- false))) {}
-
} // namespace mongo