diff options
Diffstat (limited to 'src/mongo/db/s/create_collection_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 148 |
1 files changed, 23 insertions, 125 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index a5b499cfe81..ccbad667d35 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -359,39 +359,8 @@ void broadcastDropCollection(OperationContext* opCtx, } // namespace -CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorService* service, - const BSONObj& initialState) - : ShardingDDLCoordinator(service, initialState), - _doc(CreateCollectionCoordinatorDocument::parse( - IDLParserErrorContext("CreateCollectionCoordinatorDocument"), initialState)), - _request(_doc.getCreateCollectionRequest()), - _critSecReason(BSON("command" - << "createCollection" - << "ns" << nss().toString())) {} - -boost::optional<BSONObj> CreateCollectionCoordinator::reportForCurrentOp( - MongoProcessInterface::CurrentOpConnectionsMode connMode, - MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { - BSONObjBuilder cmdBob; - if (const auto& optComment = getForwardableOpMetadata().getComment()) { - cmdBob.append(optComment.get().firstElement()); - } - cmdBob.appendElements(_request.toBSON()); - - const auto currPhase = [&]() { - stdx::lock_guard l{_docMutex}; - return _doc.getPhase(); - }(); - - BSONObjBuilder bob; - bob.append("type", "op"); - bob.append("desc", "CreateCollectionCoordinator"); - bob.append("op", "command"); - bob.append("ns", nss().toString()); - bob.append("command", cmdBob.obj()); - bob.append("currentPhase", currPhase); - bob.append("active", true); - return bob.obj(); +void CreateCollectionCoordinator::appendCommandInfo(BSONObjBuilder* cmdInfoBuilder) const { + cmdInfoBuilder->appendElements(_request.toBSON()); } void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { @@ -435,9 +404,9 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // Additionally we want to perform a majority write on the CSRS to ensure that // all the subsequent reads will see all the writes performed from a previous // execution of this coordinator. - _doc = _updateSession(opCtx, _doc); + _updateSession(opCtx); _performNoopRetryableWriteOnAllShardsAndConfigsvr( - opCtx, getCurrentSession(_doc), **executor); + opCtx, getCurrentSession(), **executor); } // Log the start of the event only if we're not recovering. @@ -461,7 +430,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( ->releaseRecoverableCriticalSection( opCtx, nss(), - _getCriticalSectionReason(), + _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); _result = createCollectionResponseOpt; @@ -474,10 +443,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( // presence of a stepdown. RecoverableCriticalSectionService::get(opCtx) ->acquireRecoverableCriticalSectionBlockWrites( - opCtx, - nss(), - _getCriticalSectionReason(), - ShardingCatalogClient::kMajorityWriteConcern); + opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); if (!_firstExecution) { auto uuid = sharding_ddl_util::getCollectionUUID(opCtx, nss()); @@ -489,12 +455,11 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( "Removing partial changes from previous run", "namespace"_attr = nss()); - _doc = _updateSession(opCtx, _doc); - cleanupPartialChunksFromPreviousAttempt( - opCtx, *uuid, getCurrentSession(_doc)); + _updateSession(opCtx); + cleanupPartialChunksFromPreviousAttempt(opCtx, *uuid, getCurrentSession()); - _doc = _updateSession(opCtx, _doc); - broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession(_doc)); + _updateSession(opCtx); + broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession()); } } @@ -517,28 +482,18 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( ->promoteRecoverableCriticalSectionToBlockAlsoReads( opCtx, nss(), - _getCriticalSectionReason(), + _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); - _doc = _updateSession(opCtx, _doc); - try { - _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession(_doc)); - } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) { - // Older 5.0 binaries don't support running the - // _shardsvrCreateCollectionParticipant command as a retryable write yet. In - // that case, retry without attaching session info. - _createCollectionOnNonPrimaryShards(opCtx, boost::none); - } + _updateSession(opCtx); + _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession()); _commit(opCtx); } // End of the critical section, from now on, read and writes are permitted. RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, - nss(), - _getCriticalSectionReason(), - ShardingCatalogClient::kMajorityWriteConcern); + opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); // Slow path. Create chunks (which might incur in an index scan) and commit must be // done outside of the critical section to prevent writes from stalling in unsharded @@ -566,10 +521,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); RecoverableCriticalSectionService::get(opCtx)->releaseRecoverableCriticalSection( - opCtx, - nss(), - _getCriticalSectionReason(), - ShardingCatalogClient::kMajorityWriteConcern); + opCtx, nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); } return status; }); @@ -751,7 +703,7 @@ void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) { } void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( - OperationContext* opCtx, const boost::optional<OperationSessionInfo>& osi) { + OperationContext* opCtx, const OperationSessionInfo& osi) { LOGV2_DEBUG(5277905, 2, "Create collection _createCollectionOnNonPrimaryShards", @@ -778,10 +730,9 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( createCollectionParticipantRequest.setIdIndex(idIndex); createCollectionParticipantRequest.setIndexes(indexes); - requests.emplace_back( - chunkShardId, - CommandHelpers::appendMajorityWriteConcern( - createCollectionParticipantRequest.toBSON(osi ? osi->toBSON() : BSONObj()))); + requests.emplace_back(chunkShardId, + CommandHelpers::appendMajorityWriteConcern( + createCollectionParticipantRequest.toBSON(osi.toBSON()))); initializedShards.emplace(chunkShardId); } @@ -817,8 +768,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss()); // Upsert Chunks. - _doc = _updateSession(opCtx, _doc); - insertChunks(opCtx, _initialChunks->chunks, getCurrentSession(_doc)); + _updateSession(opCtx); + insertChunks(opCtx, _initialChunks->chunks, getCurrentSession()); CollectionType coll(nss(), _initialChunks->collVersion().epoch(), @@ -841,9 +792,9 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setUnique(*_request.getUnique()); } - _doc = _updateSession(opCtx, _doc); + _updateSession(opCtx); try { - insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc)); + insertCollectionEntry(opCtx, nss(), coll, getCurrentSession()); notifyChangeStreamsOnShardCollection(opCtx, nss(), *_collectionUUID, _request.toBSON()); @@ -927,57 +878,4 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); } -// Phase change API. - -void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { - CoordDoc newDoc(_doc); - newDoc.setPhase(newPhase); - - LOGV2_DEBUG(5565600, - 2, - "Create collection coordinator phase transition", - "namespace"_attr = nss(), - "newPhase"_attr = CreateCollectionCoordinatorPhase_serializer(newDoc.getPhase()), - "oldPhase"_attr = CreateCollectionCoordinatorPhase_serializer(_doc.getPhase())); - - if (_doc.getPhase() == Phase::kUnset) { - newDoc = _insertStateDocument(std::move(newDoc)); - } else { - newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); - } - - { - stdx::unique_lock ul{_docMutex}; - _doc = std::move(newDoc); - } -} - -const BSONObj CreateCollectionCoordinatorDocumentPre60Compatible::kPre60IncompatibleFields = - BSON(CreateCollectionRequest::kCollectionUUIDFieldName - << 1 << CreateCollectionRequest::kImplicitlyCreateIndexFieldName << 1 - << CreateCollectionRequest::kEnforceUniquenessCheckFieldName << 1); - -void CreateCollectionCoordinatorDocumentPre60Compatible::serialize(BSONObjBuilder* builder) const { - BSONObjBuilder internalBuilder; - CreateCollectionCoordinatorDocument::serialize(&internalBuilder); - internalBuilder.asTempObj().filterFieldsUndotted(builder, kPre60IncompatibleFields, false); -} - -BSONObj CreateCollectionCoordinatorDocumentPre60Compatible::toBSON() const { - BSONObjBuilder builder; - serialize(&builder); - return builder.obj(); -} - -CreateCollectionCoordinatorPre60Compatible::CreateCollectionCoordinatorPre60Compatible( - ShardingDDLCoordinatorService* service, const BSONObj& initialState) - : CreateCollectionCoordinator(service, initialState), - _critSecReason( - BSON("command" - << "createCollection" - << "ns" << nss().toString() << "request" - << _request.toBSON().filterFieldsUndotted( - CreateCollectionCoordinatorDocumentPre60Compatible::kPre60IncompatibleFields, - false))) {} - } // namespace mongo |