diff options
author | Enrico Golfieri <enrico.golfieri@mongodb.com> | 2022-05-10 12:26:21 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-02 11:03:14 +0000 |
commit | 42f0d7d83d2b7c297202c51cbe186dae2be61348 (patch) | |
tree | 27bda286381adba8ef020ab0973853cd8ecce2fd | |
parent | 60f659abea5b129f538106b1fe3f6dfc8b621ffa (diff) | |
download | mongo-42f0d7d83d2b7c297202c51cbe186dae2be61348.tar.gz |
SERVER-62432 Ensure safe access to ShardingDDLCoordinator instance state documents
(cherry picked from commit c4522f8d57ded742ff66a303fb264f517c06502a)
18 files changed, 263 insertions, 140 deletions
diff --git a/src/mongo/db/s/collmod_coordinator.cpp b/src/mongo/db/s/collmod_coordinator.cpp index 3ad2fe44440..9e75175964f 100644 --- a/src/mongo/db/s/collmod_coordinator.cpp +++ b/src/mongo/db/s/collmod_coordinator.cpp @@ -75,17 +75,17 @@ bool hasTimeSeriesGranularityUpdate(const CollModRequest& request) { CollModCoordinator::CollModCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState) - : ShardingDDLCoordinator(service, initialState) { - _initialState = initialState.getOwned(); - _doc = CollModCoordinatorDocument::parse(IDLParserErrorContext("CollModCoordinatorDocument"), - _initialState); -} + : ShardingDDLCoordinator(service, initialState), + _initialState{initialState.getOwned()}, + _doc{CollModCoordinatorDocument::parse(IDLParserErrorContext("CollModCoordinatorDocument"), + _initialState)}, + _request{_doc.getCollModRequest()} {} void CollModCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { const auto otherDoc = CollModCoordinatorDocument::parse(IDLParserErrorContext("CollModCoordinatorDocument"), doc); - const auto& selfReq = _doc.getCollModRequest().toBSON(); + const auto& selfReq = _request.toBSON(); const auto& otherReq = otherDoc.getCollModRequest().toBSON(); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -102,14 +102,20 @@ boost::optional<BSONObj> CollModCoordinator::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } - cmdBob.appendElements(_doc.getCollModRequest().toBSON()); + + const auto currPhase = [&]() { + stdx::lock_guard l{_docMutex}; + return _doc.getPhase(); + }(); + + cmdBob.appendElements(_request.toBSON()); BSONObjBuilder bob; bob.append("type", "op"); bob.append("desc", "CollModCoordinator"); bob.append("op", "command"); bob.append("ns", nss().toString()); bob.append("command", cmdBob.obj()); - bob.append("currentPhase", _doc.getPhase()); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -126,10 +132,15 @@ void CollModCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = CollModCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } void CollModCoordinator::_performNoopRetryableWriteOnParticipants( @@ -191,13 +202,12 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( { AutoGetCollection coll{ opCtx, nss(), MODE_IS, AutoGetCollectionViewMode::kViewsPermitted}; - checkCollectionUUIDMismatch( - opCtx, nss(), *coll, _doc.getCollModRequest().getCollectionUUID()); + checkCollectionUUIDMismatch(opCtx, nss(), *coll, _request.getCollectionUUID()); } _saveCollectionInfoOnCoordinatorIfNecessary(opCtx); - auto isGranularityUpdate = hasTimeSeriesGranularityUpdate(_doc.getCollModRequest()); + auto isGranularityUpdate = hasTimeSeriesGranularityUpdate(_request); uassert(6201808, "Cannot use time-series options for a non-timeseries collection", _collInfo->timeSeriesOptions || !isGranularityUpdate); @@ -207,7 +217,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( "from 'seconds' to 'minutes' or 'minutes' to 'hours'.", timeseries::isValidTimeseriesGranularityTransition( _collInfo->timeSeriesOptions->getGranularity(), - *_doc.getCollModRequest().getTimeseries()->getGranularity())); + *_request.getTimeseries()->getGranularity())); } }) .then(_executePhase( @@ -229,8 +239,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( _saveShardingInfoOnCoordinatorIfNecessary(opCtx); - if (_collInfo->isSharded && - hasTimeSeriesGranularityUpdate(_doc.getCollModRequest())) { + if (_collInfo->isSharded && hasTimeSeriesGranularityUpdate(_request)) { ShardsvrParticipantBlock blockCRUDOperationsRequest(_collInfo->nsForTargeting); const auto cmdObj = CommandHelpers::appendMajorityWriteConcern( blockCRUDOperationsRequest.toBSON({})); @@ -253,8 +262,8 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( _saveShardingInfoOnCoordinatorIfNecessary(opCtx); if (_collInfo->isSharded && _collInfo->timeSeriesOptions && - hasTimeSeriesGranularityUpdate(_doc.getCollModRequest())) { - ConfigsvrCollMod request(_collInfo->nsForTargeting, _doc.getCollModRequest()); + hasTimeSeriesGranularityUpdate(_request)) { + ConfigsvrCollMod request(_collInfo->nsForTargeting, _request); const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(request.toBSON({})); @@ -280,9 +289,9 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( _saveShardingInfoOnCoordinatorIfNecessary(opCtx); if (_collInfo->isSharded) { - ShardsvrCollModParticipant request(nss(), _doc.getCollModRequest()); - bool needsUnblock = _collInfo->timeSeriesOptions && - hasTimeSeriesGranularityUpdate(_doc.getCollModRequest()); + ShardsvrCollModParticipant request(nss(), _request); + bool needsUnblock = + _collInfo->timeSeriesOptions && hasTimeSeriesGranularityUpdate(_request); request.setNeedsUnblock(needsUnblock); std::vector<AsyncRequestsSender::Response> responses; @@ -327,7 +336,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl( sharding_ddl_util::resumeMigrations(opCtx, nss(), _doc.getCollUUID()); } else { CollMod cmd(nss()); - cmd.setCollModRequest(_doc.getCollModRequest()); + cmd.setCollModRequest(_request); BSONObjBuilder collModResBuilder; uassertStatusOK(timeseries::processCollModCommandWithTimeSeriesTranslation( opCtx, nss(), cmd, true, &collModResBuilder)); diff --git a/src/mongo/db/s/collmod_coordinator.h b/src/mongo/db/s/collmod_coordinator.h index 98243d8bdbb..b85b6b16d5a 100644 --- a/src/mongo/db/s/collmod_coordinator.h +++ b/src/mongo/db/s/collmod_coordinator.h @@ -108,7 +108,11 @@ private: void _saveShardingInfoOnCoordinatorIfNecessary(OperationContext* opCtx); BSONObj _initialState; + mutable Mutex _docMutex = MONGO_MAKE_LATCH("CollModCoordinator::_docMutex"); CollModCoordinatorDocument _doc; + + const mongo::CollModRequest _request; + boost::optional<BSONObj> _result; boost::optional<CollectionInfo> _collInfo; boost::optional<ShardingInfo> _shardingInfo; diff --git a/src/mongo/db/s/collmod_coordinator_pre60_compatible.cpp b/src/mongo/db/s/collmod_coordinator_pre60_compatible.cpp index fad015c8fd7..10cd428adae 100644 --- a/src/mongo/db/s/collmod_coordinator_pre60_compatible.cpp +++ b/src/mongo/db/s/collmod_coordinator_pre60_compatible.cpp @@ -96,6 +96,12 @@ boost::optional<BSONObj> CollModCoordinatorPre60Compatible::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } + + const auto currPhase = [&]() { + stdx::lock_guard l{_docMutex}; + return _doc.getPhase(); + }(); + cmdBob.appendElements(_doc.getCollModRequest().toBSON()); BSONObjBuilder bob; bob.append("type", "op"); @@ -103,7 +109,7 @@ boost::optional<BSONObj> CollModCoordinatorPre60Compatible::reportForCurrentOp( bob.append("op", "command"); bob.append("ns", nss().toString()); bob.append("command", cmdBob.obj()); - bob.append("currentPhase", _doc.getPhase()); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -120,10 +126,15 @@ void CollModCoordinatorPre60Compatible::_enterPhase(Phase newPhase) { "oldPhase"_attr = CollModCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } void CollModCoordinatorPre60Compatible::_performNoopRetryableWriteOnParticipants( diff --git a/src/mongo/db/s/collmod_coordinator_pre60_compatible.h b/src/mongo/db/s/collmod_coordinator_pre60_compatible.h index 29c250dc452..a8de0c67f53 100644 --- a/src/mongo/db/s/collmod_coordinator_pre60_compatible.h +++ b/src/mongo/db/s/collmod_coordinator_pre60_compatible.h @@ -32,6 +32,7 @@ #include "mongo/db/s/collmod_coordinator_document_gen.h" #include "mongo/db/s/sharding_ddl_coordinator.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/stdx/mutex.h" namespace mongo { @@ -61,6 +62,7 @@ public: private: ShardingDDLCoordinatorMetadata const& metadata() const override { + stdx::lock_guard l{_docMutex}; return _doc.getShardingDDLCoordinatorMetadata(); } @@ -90,7 +92,9 @@ private: OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor); BSONObj _initialState; + mutable Mutex _docMutex = MONGO_MAKE_LATCH("CollModCoordinatorPre60Compatible::_docMutex"); CollModCoordinatorDocument _doc; + boost::optional<BSONObj> _result; }; diff --git a/src/mongo/db/s/compact_structured_encryption_data_coordinator.cpp b/src/mongo/db/s/compact_structured_encryption_data_coordinator.cpp index 3fcfe10062a..94125c8e167 100644 --- a/src/mongo/db/s/compact_structured_encryption_data_coordinator.cpp +++ b/src/mongo/db/s/compact_structured_encryption_data_coordinator.cpp @@ -186,18 +186,41 @@ boost::optional<BSONObj> CompactStructuredEncryptionDataCoordinator::reportForCu MongoProcessInterface::CurrentOpConnectionsMode connMode, MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { BSONObjBuilder bob; + + CompactStructuredEncryptionDataPhaseEnum currPhase; + std::string nss; + std::string escNss; + std::string eccNss; + std::string ecoNss; + std::string ecocNss; + std::string ecocRenameUuid; + std::string ecocUiid; + std::string ecocRenameNss; + { + stdx::lock_guard l{_docMutex}; + currPhase = _doc.getPhase(); + nss = _doc.getId().getNss().ns(); + escNss = _doc.getEscNss().ns(); + eccNss = _doc.getEccNss().ns(); + ecoNss = _doc.getEcocNss().ns(); + ecocNss = _doc.getEcocNss().ns(); + ecocRenameUuid = + _doc.getEcocRenameUuid() ? _doc.getEcocRenameUuid().value().toString() : "none"; + ecocUiid = _doc.getEcocUuid() ? _doc.getEcocUuid().value().toString() : "none"; + ecocRenameNss = _doc.getEcocRenameNss().ns(); + } + bob.append("type", "op"); bob.append("desc", "CompactStructuredEncryptionDataCoordinator"); bob.append("op", "command"); - bob.append("nss", _doc.getId().getNss().ns()); - bob.append("escNss", _doc.getEscNss().ns()); - bob.append("eccNss", _doc.getEccNss().ns()); - bob.append("ecocNss", _doc.getEcocNss().ns()); - bob.append("ecocUuid", _doc.getEcocUuid() ? _doc.getEcocUuid().value().toString() : "none"); - bob.append("ecocRenameNss", _doc.getEcocRenameNss().ns()); - bob.append("ecocRenameUuid", - _doc.getEcocRenameUuid() ? _doc.getEcocRenameUuid().value().toString() : "none"); - bob.append("currentPhase", _doc.getPhase()); + bob.append("nss", nss); + bob.append("escNss", escNss); + bob.append("eccNss", eccNss); + bob.append("ecocNss", ecocNss); + bob.append("ecocUuid", ecocUiid); + bob.append("ecocRenameNss", ecocRenameNss); + bob.append("ecocRenameUuid", ecocRenameUuid); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -224,12 +247,16 @@ void CompactStructuredEncryptionDataCoordinator::_enterPhase(Phase newPhase) { if (_doc.getPhase() == Phase::kRenameEcocForCompact) { doc.setSkipCompact(_skipCompact); doc.setEcocRenameUuid(_ecocRenameUuid); - _doc = _insertStateDocument(std::move(doc)); - return; + doc = _insertStateDocument(std::move(doc)); + } else { + auto opCtx = cc().makeOperationContext(); + doc = _updateStateDocument(opCtx.get(), std::move(doc)); } - auto opCtx = cc().makeOperationContext(); - _doc = _updateStateDocument(opCtx.get(), std::move(doc)); + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(doc); + } } ExecutorFuture<void> CompactStructuredEncryptionDataCoordinator::_runImpl( diff --git a/src/mongo/db/s/compact_structured_encryption_data_coordinator.h b/src/mongo/db/s/compact_structured_encryption_data_coordinator.h index f6a2bc9d922..4b8ffd33441 100644 --- a/src/mongo/db/s/compact_structured_encryption_data_coordinator.h +++ b/src/mongo/db/s/compact_structured_encryption_data_coordinator.h @@ -60,10 +60,6 @@ public: return *_response; } - const NamespaceString& nss() const { - return _doc.getId().getNss(); - } - void checkIfOptionsConflict(const BSONObj& doc) const final {} private: @@ -93,7 +89,10 @@ private: const CancellationToken& token) noexcept final; private: + mutable Mutex _docMutex = + MONGO_MAKE_LATCH("CompactStructuredEncryptionDataCoordinator::_docMutex"); StateDoc _doc; + boost::optional<CompactStructuredEncryptionDataCommandReply> _response; bool _skipCompact{false}; boost::optional<UUID> _ecocRenameUuid; diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index d3ec9d03230..7501bd05b9b 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -397,6 +397,7 @@ CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorS : ShardingDDLCoordinator(service, initialState), _doc(CreateCollectionCoordinatorDocument::parse( IDLParserErrorContext("CreateCollectionCoordinatorDocument"), initialState)), + _request(_doc.getCreateCollectionRequest()), _critSecReason(BSON("command" << "createCollection" << "ns" << nss().toString())) {} @@ -408,7 +409,12 @@ boost::optional<BSONObj> CreateCollectionCoordinator::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } - cmdBob.appendElements(_doc.getCreateCollectionRequest().toBSON()); + cmdBob.appendElements(_request.toBSON()); + + const auto currPhase = [&]() { + stdx::lock_guard l{_docMutex}; + return _doc.getPhase(); + }(); BSONObjBuilder bob; bob.append("type", "op"); @@ -416,7 +422,7 @@ boost::optional<BSONObj> CreateCollectionCoordinator::reportForCurrentOp( bob.append("op", "command"); bob.append("ns", nss().toString()); bob.append("command", cmdBob.obj()); - bob.append("currentPhase", _doc.getPhase()); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -430,8 +436,7 @@ void CreateCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) con "Another create collection with different arguments is already running for the same " "namespace", SimpleBSONObjComparator::kInstance.evaluate( - _doc.getCreateCollectionRequest().toBSON() == - otherDoc.getCreateCollectionRequest().toBSON())); + _request.toBSON() == otherDoc.getCreateCollectionRequest().toBSON())); } ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( @@ -439,7 +444,7 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( const CancellationToken& token) noexcept { return ExecutorFuture<void>(**executor) .then([this, anchor = shared_from_this()] { - _shardKeyPattern = ShardKeyPattern(*_doc.getShardKey()); + _shardKeyPattern = ShardKeyPattern(*_request.getShardKey()); if (_doc.getPhase() < Phase::kCommit) { auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); @@ -478,8 +483,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( opCtx, nss(), _shardKeyPattern->getKeyPattern().toBSON(), - getCollation(opCtx, nss(), _doc.getCollation()).second, - _doc.getUnique().value_or(false))) { + getCollation(opCtx, nss(), _request.getCollation()).second, + _request.getUnique().value_or(false))) { _checkCollectionUUIDMismatch(opCtx); // The critical section can still be held here if the node committed the @@ -532,8 +537,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( audit::logShardCollection(opCtx->getClient(), nss().ns(), - *_doc.getShardKey(), - _doc.getUnique().value_or(false)); + *_request.getShardKey(), + _request.getUnique().value_or(false)); if (_splitPolicy->isOptimized()) { _createChunks(opCtx); @@ -646,7 +651,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx uassert(ErrorCodes::InvalidOptions, "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on " "the hashed field by declaring an additional (non-hashed) unique index on the field.", - !_shardKeyPattern->isHashedPattern() || !_doc.getUnique().value_or(false)); + !_shardKeyPattern->isHashedPattern() || !_request.getUnique().value_or(false)); // Ensure that a time-series collection cannot be sharded unless the feature flag is enabled. if (nss().isTimeseriesBucketsCollection()) { @@ -663,7 +668,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx !nss().isSystem() || nss() == NamespaceString::kLogicalSessionsNamespace || nss().isTemporaryReshardingCollection() || nss().isTimeseriesBucketsCollection()); - if (_doc.getNumInitialChunks()) { + if (_request.getNumInitialChunks()) { // Ensure numInitialChunks is within valid bounds. // Cannot have more than kMaxSplitPoints initial chunks per shard. Setting a maximum of // 1,000,000 chunks in total to limit the amount of memory this command consumes so there is @@ -672,7 +677,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx const int maxNumInitialChunksForShards = Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() * shardutil::kMaxSplitPoints; const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption - int numChunks = _doc.getNumInitialChunks().value(); + int numChunks = _request.getNumInitialChunks().value(); uassert(ErrorCodes::InvalidOptions, str::stream() << "numInitialChunks cannot be more than either: " << maxNumInitialChunksForShards << ", " << shardutil::kMaxSplitPoints @@ -704,7 +709,7 @@ void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx void CreateCollectionCoordinator::_checkCollectionUUIDMismatch(OperationContext* opCtx) const { AutoGetCollection coll{opCtx, nss(), MODE_IS}; - checkCollectionUUIDMismatch(opCtx, nss(), coll.getCollection(), _doc.getCollectionUUID()); + checkCollectionUUIDMismatch(opCtx, nss(), coll.getCollection(), _request.getCollectionUUID()); } void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) { @@ -712,12 +717,12 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = nss()); boost::optional<Collation> collation; - std::tie(collation, _collationBSON) = getCollation(opCtx, nss(), _doc.getCollation()); + std::tie(collation, _collationBSON) = getCollation(opCtx, nss(), _request.getCollation()); // We need to implicitly create a timeseries view and underlying bucket collection. - if (_collectionEmpty && _doc.getTimeseries()) { + if (_collectionEmpty && _request.getTimeseries()) { const auto viewName = nss().getTimeseriesViewNamespace(); - auto createCmd = makeCreateCommand(viewName, collation, _doc.getTimeseries().get()); + auto createCmd = makeCreateCommand(viewName, collation, _request.getTimeseries().get()); BSONObj createRes; DBDirectClient localClient(opCtx); @@ -737,14 +742,14 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* shardkeyutil::validateShardKeyIsNotEncrypted(opCtx, nss(), *_shardKeyPattern); auto indexCreated = false; - if (_doc.getImplicitlyCreateIndex().value_or(true)) { + if (_request.getImplicitlyCreateIndex().value_or(true)) { indexCreated = shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( opCtx, nss(), *_shardKeyPattern, _collationBSON, - _doc.getUnique().value_or(false), - _doc.getEnforceUniquenessCheck().value_or(true), + _request.getUnique().value_or(false), + _request.getEnforceUniquenessCheck().value_or(true), shardkeyutil::ValidationBehaviorsShardCollection(opCtx)); } else { uassert(6373200, @@ -753,8 +758,8 @@ void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* nss(), *_shardKeyPattern, _collationBSON, - _doc.getUnique().value_or(false) && - _doc.getEnforceUniquenessCheck().value_or(true), + _request.getUnique().value_or(false) && + _request.getEnforceUniquenessCheck().value_or(true), shardkeyutil::ValidationBehaviorsShardCollection(opCtx))); } @@ -782,9 +787,9 @@ void CreateCollectionCoordinator::_createPolicy(OperationContext* opCtx) { _splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy( opCtx, *_shardKeyPattern, - _doc.getNumInitialChunks() ? *_doc.getNumInitialChunks() : 0, - _doc.getPresplitHashedZones() ? *_doc.getPresplitHashedZones() : false, - _doc.getInitialSplitPoints(), + _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0, + _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false, + _request.getInitialSplitPoints(), getTagsAndValidate(opCtx, nss(), _shardKeyPattern->toBSON()), getNumShards(opCtx), *_collectionEmpty); @@ -878,9 +883,9 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { *_collectionUUID, _shardKeyPattern->getKeyPattern()); - if (_doc.getCreateCollectionRequest().getTimeseries()) { + if (_request.getTimeseries()) { TypeCollectionTimeseriesFields timeseriesFields; - timeseriesFields.setTimeseriesOptions(*_doc.getCreateCollectionRequest().getTimeseries()); + timeseriesFields.setTimeseriesOptions(*_request.getTimeseries()); coll.setTimeseriesFields(std::move(timeseriesFields)); } @@ -888,16 +893,15 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setDefaultCollation(_collationBSON.value()); } - if (_doc.getUnique()) { - coll.setUnique(*_doc.getUnique()); + if (_request.getUnique()) { + coll.setUnique(*_request.getUnique()); } _doc = _updateSession(opCtx, _doc); try { insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc)); - _writeOplogMessage( - opCtx, nss(), *_collectionUUID, _doc.getCreateCollectionRequest().toBSON()); + _writeOplogMessage(opCtx, nss(), *_collectionUUID, _request.toBSON()); LOGV2_DEBUG(5277907, 2, "Collection successfully committed", "namespace"_attr = nss()); @@ -959,7 +963,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { void CreateCollectionCoordinator::_logStartCreateCollection(OperationContext* opCtx) { BSONObjBuilder collectionDetail; - collectionDetail.append("shardKey", *_doc.getCreateCollectionRequest().getShardKey()); + collectionDetail.append("shardKey", *_request.getShardKey()); collectionDetail.append("collection", nss().ns()); collectionDetail.append("primary", ShardingState::get(opCtx)->shardId().toString()); ShardingLogging::get(opCtx)->logChange( @@ -993,10 +997,15 @@ void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = CreateCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } const BSONObj CreateCollectionCoordinatorDocumentPre60Compatible::kPre60IncompatibleFields = @@ -1023,7 +1032,7 @@ CreateCollectionCoordinatorPre60Compatible::CreateCollectionCoordinatorPre60Comp BSON("command" << "createCollection" << "ns" << nss().toString() << "request" - << _doc.getCreateCollectionRequest().toBSON().filterFieldsUndotted( + << _request.toBSON().filterFieldsUndotted( CreateCollectionCoordinatorDocumentPre60Compatible::kPre60IncompatibleFields, false))) {} diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index 3cd5e9fee3f..565972afcb1 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -66,8 +66,11 @@ public: } protected: + mutable Mutex _docMutex = MONGO_MAKE_LATCH("CreateCollectionCoordinator::_docMutex"); CoordDoc _doc; + const mongo::CreateCollectionRequest _request; + private: ShardingDDLCoordinatorMetadata const& metadata() const override { return _doc.getShardingDDLCoordinatorMetadata(); diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 0ac0c031d2d..a9591eae302 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -59,13 +59,19 @@ boost::optional<BSONObj> DropCollectionCoordinator::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } + + const auto currPhase = [&]() { + stdx::lock_guard l{_docMutex}; + return _doc.getPhase(); + }(); + BSONObjBuilder bob; bob.append("type", "op"); bob.append("desc", "DropCollectionCoordinator"); bob.append("op", "command"); bob.append("ns", nss().toString()); bob.append("command", cmdBob.obj()); - bob.append("currentPhase", _doc.getPhase()); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -105,10 +111,15 @@ void DropCollectionCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = DropCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> DropCollectionCoordinator::_runImpl( diff --git a/src/mongo/db/s/drop_collection_coordinator.h b/src/mongo/db/s/drop_collection_coordinator.h index 1f43b204de4..140013e41e1 100644 --- a/src/mongo/db/s/drop_collection_coordinator.h +++ b/src/mongo/db/s/drop_collection_coordinator.h @@ -33,7 +33,6 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/drop_collection_coordinator_document_gen.h" #include "mongo/db/s/sharding_ddl_coordinator.h" - namespace mongo { class DropCollectionCoordinator final : public ShardingDDLCoordinator { @@ -83,6 +82,7 @@ private: void _enterPhase(Phase newPhase); + mutable Mutex _docMutex = MONGO_MAKE_LATCH("DropCollectionCoordinator::_docMutex"); DropCollectionCoordinatorDocument _doc; }; diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index 35d66821203..3645f186d72 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -156,13 +156,19 @@ boost::optional<BSONObj> DropDatabaseCoordinator::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } + + const auto currPhase = [&]() { + stdx::lock_guard l{_docMutex}; + return _doc.getPhase(); + }(); + BSONObjBuilder bob; bob.append("type", "op"); bob.append("desc", "DropDatabaseCoordinator"); bob.append("op", "command"); bob.append("ns", nss().toString()); bob.append("command", cmdBob.obj()); - bob.append("currentPhase", _doc.getPhase()); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -179,10 +185,15 @@ void DropDatabaseCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = DropDatabaseCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } void DropDatabaseCoordinator::_clearDatabaseInfoOnPrimary(OperationContext* opCtx) { diff --git a/src/mongo/db/s/drop_database_coordinator.h b/src/mongo/db/s/drop_database_coordinator.h index 8e81f19b79c..47d63310a19 100644 --- a/src/mongo/db/s/drop_database_coordinator.h +++ b/src/mongo/db/s/drop_database_coordinator.h @@ -50,6 +50,7 @@ public: private: ShardingDDLCoordinatorMetadata const& metadata() const override { + stdx::lock_guard l{_docMutex}; return _doc.getShardingDDLCoordinatorMetadata(); } @@ -83,7 +84,10 @@ private: void _clearDatabaseInfoOnSecondaries(OperationContext* opCtx); + mutable Mutex _docMutex = MONGO_MAKE_LATCH("DropDatabaseCoordinator::_docMutex"); DropDatabaseCoordinatorDocument _doc; + + StringData _dbName; }; diff --git a/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp b/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp index 3d90db4e497..1598e7f286a 100644 --- a/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp +++ b/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp @@ -56,6 +56,7 @@ RefineCollectionShardKeyCoordinator::RefineCollectionShardKeyCoordinator( : ShardingDDLCoordinator(service, initialState), _doc(RefineCollectionShardKeyCoordinatorDocument::parse( IDLParserErrorContext("RefineCollectionShardKeyCoordinatorDocument"), initialState)), + _request(_doc.getRefineCollectionShardKeyRequest()), _newShardKey(_doc.getNewShardKey()), _persistCoordinatorDocument(persistCoordinatorDocument) {} @@ -68,8 +69,7 @@ void RefineCollectionShardKeyCoordinator::checkIfOptionsConflict(const BSONObj& "Another refine collection with different arguments is already running for the same " "namespace", SimpleBSONObjComparator::kInstance.evaluate( - _doc.getRefineCollectionShardKeyRequest().toBSON() == - otherDoc.getRefineCollectionShardKeyRequest().toBSON())); + _request.toBSON() == otherDoc.getRefineCollectionShardKeyRequest().toBSON())); } boost::optional<BSONObj> RefineCollectionShardKeyCoordinator::reportForCurrentOp( @@ -79,7 +79,7 @@ boost::optional<BSONObj> RefineCollectionShardKeyCoordinator::reportForCurrentOp if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } - cmdBob.appendElements(_doc.getRefineCollectionShardKeyRequest().toBSON()); + cmdBob.appendElements(_request.toBSON()); BSONObjBuilder bob; bob.append("type", "op"); @@ -108,10 +108,15 @@ void RefineCollectionShardKeyCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = RefineCollectionShardKeyCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl( @@ -128,7 +133,7 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl( { AutoGetCollection coll{ opCtx, nss(), MODE_IS, AutoGetCollectionViewMode::kViewsPermitted}; - checkCollectionUUIDMismatch(opCtx, nss(), *coll, _doc.getCollectionUUID()); + checkCollectionUUIDMismatch(opCtx, nss(), *coll, _request.getCollectionUUID()); } shardkeyutil::validateShardKeyIsNotEncrypted( @@ -141,7 +146,7 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl( nss(), _newShardKey.toBSON(), cm.getVersion().epoch()); configsvrRefineCollShardKey.setDbName(nss().db().toString()); configsvrRefineCollShardKey.setEnforceUniquenessCheck( - _doc.getEnforceUniquenessCheck()); + _request.getEnforceUniquenessCheck()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); if (_persistCoordinatorDocument) { diff --git a/src/mongo/db/s/refine_collection_shard_key_coordinator.h b/src/mongo/db/s/refine_collection_shard_key_coordinator.h index 58155b5eed5..b54fb1290e6 100644 --- a/src/mongo/db/s/refine_collection_shard_key_coordinator.h +++ b/src/mongo/db/s/refine_collection_shard_key_coordinator.h @@ -81,7 +81,11 @@ private: void _enterPhase(Phase newPhase); + mutable Mutex _docMutex = MONGO_MAKE_LATCH("RefineCollectionShardKeyCoordinator::_docMutex"); RefineCollectionShardKeyCoordinatorDocument _doc; + + const mongo::RefineCollectionShardKeyRequest _request; + const KeyPattern _newShardKey; const bool _persistCoordinatorDocument; // TODO: SERVER-62850 remove this then 6.0 branches out }; diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 47db3dc63d7..59048ce4c0f 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -90,13 +90,14 @@ RenameCollectionCoordinator::RenameCollectionCoordinator(ShardingDDLCoordinatorS const BSONObj& initialState) : ShardingDDLCoordinator(service, initialState), _doc(RenameCollectionCoordinatorDocument::parse( - IDLParserErrorContext("RenameCollectionCoordinatorDocument"), initialState)) {} + IDLParserErrorContext("RenameCollectionCoordinatorDocument"), initialState)), + _request(_doc.getRenameCollectionRequest()) {} void RenameCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { const auto otherDoc = RenameCollectionCoordinatorDocument::parse( IDLParserErrorContext("RenameCollectionCoordinatorDocument"), doc); - const auto& selfReq = _doc.getRenameCollectionRequest().toBSON(); + const auto& selfReq = _request.toBSON(); const auto& otherReq = otherDoc.getRenameCollectionRequest().toBSON(); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -107,7 +108,7 @@ void RenameCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) con std::vector<StringData> RenameCollectionCoordinator::_acquireAdditionalLocks( OperationContext* opCtx) { - return {_doc.getTo().ns()}; + return {_request.getTo().ns()}; } boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp( @@ -118,7 +119,12 @@ boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } - cmdBob.appendElements(_doc.getRenameCollectionRequest().toBSON()); + cmdBob.appendElements(_request.toBSON()); + + const auto currPhase = [&]() { + stdx::lock_guard l{_docMutex}; + return _doc.getPhase(); + }(); BSONObjBuilder bob; bob.append("type", "op"); @@ -126,7 +132,7 @@ boost::optional<BSONObj> RenameCollectionCoordinator::reportForCurrentOp( bob.append("op", "command"); bob.append("ns", nss().toString()); bob.append("command", cmdBob.obj()); - bob.append("currentPhase", _doc.getPhase()); + bob.append("currentPhase", currPhase); bob.append("active", true); return bob.obj(); } @@ -139,15 +145,20 @@ void RenameCollectionCoordinator::_enterPhase(Phase newPhase) { 2, "Rename collection coordinator phase transition", "fromNs"_attr = nss(), - "toNs"_attr = _doc.getTo(), + "toNs"_attr = _request.getTo(), "newPhase"_attr = RenameCollectionCoordinatorPhase_serializer(newDoc.getPhase()), "oldPhase"_attr = RenameCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( @@ -162,7 +173,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); const auto& fromNss = nss(); - const auto& toNss = _doc.getTo(); + const auto& toNss = _request.getTo(); try { uassert(ErrorCodes::InvalidOptions, @@ -236,7 +247,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( getForwardableOpMetadata().setOn(opCtx); const auto& fromNss = nss(); - const auto& toNss = _doc.getTo(); + const auto& toNss = _request.getTo(); ShardingLogging::get(opCtx)->logChange( opCtx, @@ -281,8 +292,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( fromNss, _doc.getSourceUUID().get()); renameCollParticipantRequest.setDbName(fromNss.db()); renameCollParticipantRequest.setTargetUUID(_doc.getTargetUUID()); - renameCollParticipantRequest.setRenameCollectionRequest( - _doc.getRenameCollectionRequest()); + renameCollParticipantRequest.setRenameCollectionRequest(_request); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); // We need to send the command to all the shards because both @@ -319,7 +329,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( opCtx, getCurrentSession(_doc), **executor); } - ConfigsvrRenameCollectionMetadata req(nss(), _doc.getTo()); + ConfigsvrRenameCollectionMetadata req(nss(), _request.getTo()); req.setOptFromCollection(_doc.getOptShardedCollInfo()); const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(req.toBSON({})); const auto& configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); @@ -366,8 +376,7 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( ShardsvrRenameCollectionUnblockParticipant unblockParticipantRequest( fromNss, _doc.getSourceUUID().get()); unblockParticipantRequest.setDbName(fromNss.db()); - unblockParticipantRequest.setRenameCollectionRequest( - _doc.getRenameCollectionRequest()); + unblockParticipantRequest.setRenameCollectionRequest(_request); auto const cmdObj = CommandHelpers::appendMajorityWriteConcern( unblockParticipantRequest.toBSON({})); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); @@ -389,28 +398,29 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( opCtx, fromNss.db(), cmdObj, participants, **executor); } })) - .then(_executePhase( - Phase::kSetResponse, - [this, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - getForwardableOpMetadata().setOn(opCtx); - - // Retrieve the new collection version - const auto catalog = Grid::get(opCtx)->catalogCache(); - const auto cm = uassertStatusOK( - catalog->getCollectionRoutingInfoWithRefresh(opCtx, _doc.getTo())); - _response = RenameCollectionResponse(cm.isSharded() ? cm.getVersion() - : ChunkVersion::UNSHARDED()); - - ShardingLogging::get(opCtx)->logChange( - opCtx, - "renameCollection.end", - nss().ns(), - BSON("source" << nss().toString() << "destination" << _doc.getTo().toString()), - ShardingCatalogClient::kMajorityWriteConcern); - LOGV2(5460504, "Collection renamed", "namespace"_attr = nss()); - })) + .then(_executePhase(Phase::kSetResponse, + [this, anchor = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + // Retrieve the new collection version + const auto catalog = Grid::get(opCtx)->catalogCache(); + const auto cm = + uassertStatusOK(catalog->getCollectionRoutingInfoWithRefresh( + opCtx, _request.getTo())); + _response = RenameCollectionResponse( + cm.isSharded() ? cm.getVersion() : ChunkVersion::UNSHARDED()); + + ShardingLogging::get(opCtx)->logChange( + opCtx, + "renameCollection.end", + nss().ns(), + BSON("source" << nss().toString() << "destination" + << _request.getTo().toString()), + ShardingCatalogClient::kMajorityWriteConcern); + LOGV2(5460504, "Collection renamed", "namespace"_attr = nss()); + })) .onError([this, anchor = shared_from_this()](const Status& status) { if (!status.isA<ErrorCategory::NotPrimaryError>() && !status.isA<ErrorCategory::ShutdownError>()) { diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h index 0b1725f39e4..af395745001 100644 --- a/src/mongo/db/s/rename_collection_coordinator.h +++ b/src/mongo/db/s/rename_collection_coordinator.h @@ -91,9 +91,11 @@ private: void _enterPhase(Phase newPhase); + mutable Mutex _docMutex = MONGO_MAKE_LATCH("RenameCollectionCoordinator::_docMutex"); RenameCollectionCoordinatorDocument _doc; boost::optional<RenameCollectionResponse> _response; + const RenameCollectionRequest _request; }; } // namespace mongo diff --git a/src/mongo/db/s/reshard_collection_coordinator.cpp b/src/mongo/db/s/reshard_collection_coordinator.cpp index 8dc1765d8d1..802a85cfe1d 100644 --- a/src/mongo/db/s/reshard_collection_coordinator.cpp +++ b/src/mongo/db/s/reshard_collection_coordinator.cpp @@ -52,6 +52,7 @@ ReshardCollectionCoordinator::ReshardCollectionCoordinator(ShardingDDLCoordinato _initialState(initialState.getOwned()), _doc(ReshardCollectionCoordinatorDocument::parse( IDLParserErrorContext("ReshardCollectionCoordinatorDocument"), _initialState)), + _request(_doc.getReshardCollectionRequest()), _persistCoordinatorDocument(persistCoordinatorDocument) {} void ReshardCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) const { @@ -62,8 +63,7 @@ void ReshardCollectionCoordinator::checkIfOptionsConflict(const BSONObj& doc) co "Another reshard collection with different arguments is already running for the same " "namespace", SimpleBSONObjComparator::kInstance.evaluate( - _doc.getReshardCollectionRequest().toBSON() == - otherDoc.getReshardCollectionRequest().toBSON())); + _request.toBSON() == otherDoc.getReshardCollectionRequest().toBSON())); } boost::optional<BSONObj> ReshardCollectionCoordinator::reportForCurrentOp( @@ -73,7 +73,7 @@ boost::optional<BSONObj> ReshardCollectionCoordinator::reportForCurrentOp( if (const auto& optComment = getForwardableOpMetadata().getComment()) { cmdBob.append(optComment.get().firstElement()); } - cmdBob.appendElements(_doc.getReshardCollectionRequest().toBSON()); + cmdBob.appendElements(_request.toBSON()); BSONObjBuilder bob; bob.append("type", "op"); @@ -101,10 +101,15 @@ void ReshardCollectionCoordinator::_enterPhase(Phase newPhase) { "oldPhase"_attr = ReshardCollectionCoordinatorPhase_serializer(_doc.getPhase())); if (_doc.getPhase() == Phase::kUnset) { - _doc = _insertStateDocument(std::move(newDoc)); - return; + newDoc = _insertStateDocument(std::move(newDoc)); + } else { + newDoc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); + } + + { + stdx::unique_lock ul{_docMutex}; + _doc = std::move(newDoc); } - _doc = _updateStateDocument(cc().makeOperationContext().get(), std::move(newDoc)); } ExecutorFuture<void> ReshardCollectionCoordinator::_runImpl( diff --git a/src/mongo/db/s/reshard_collection_coordinator.h b/src/mongo/db/s/reshard_collection_coordinator.h index bd4639f056f..54d98ee03d1 100644 --- a/src/mongo/db/s/reshard_collection_coordinator.h +++ b/src/mongo/db/s/reshard_collection_coordinator.h @@ -55,6 +55,7 @@ protected: private: ShardingDDLCoordinatorMetadata const& metadata() const override { + stdx::lock_guard l{_docMutex}; return _doc.getShardingDDLCoordinatorMetadata(); } @@ -81,7 +82,11 @@ private: void _enterPhase(Phase newPhase); const BSONObj _initialState; + mutable Mutex _docMutex = MONGO_MAKE_LATCH("ReshardCollectionCoordinator::_docMutex"); ReshardCollectionCoordinatorDocument _doc; + + const mongo::ReshardCollectionRequest _request; + const bool _persistCoordinatorDocument; // TODO: SERVER-62338 remove this then 6.0 branches out }; |