diff options
Diffstat (limited to 'src/mongo')
79 files changed, 658 insertions, 689 deletions
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index 30dd68508eb..75435b458bc 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -271,7 +271,8 @@ void convertToCapped(OperationContext* opCtx, const NamespaceString& ns, long lo StringData shortSource = ns.coll(); AutoGetCollection coll(opCtx, ns, MODE_X); - CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, ns)->checkShardVersionOrThrow( + opCtx); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, ns); diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index e50e6d50595..ab2996045fd 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -75,9 +75,9 @@ void assertNoMovePrimaryInProgress(OperationContext* opCtx, NamespaceString cons try { auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( opCtx, nss.dbName(), DSSAcquisitionMode::kShared); + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss); - auto css = CollectionShardingState::get(opCtx, nss); - auto collDesc = css->getCollectionDescription(opCtx); + auto collDesc = scopedCss->getCollectionDescription(opCtx); collDesc.throwIfReshardingInProgress(nss); if (!collDesc.isSharded()) { @@ -757,7 +757,8 @@ Status _collModInternal(OperationContext* opCtx, // If a sharded time-series collection is dropped, it's possible that a stale mongos // sends the request on the buckets namespace instead of the view namespace. Ensure that // the shardVersion is upto date before throwing an error. - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); } checkCollectionUUIDMismatch(opCtx, nss, nullptr, cmd.getCollectionUUID()); return Status(ErrorCodes::NamespaceNotFound, "ns does not exist"); diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index 33c5ea93a1a..79fecf84cd7 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -179,7 +179,8 @@ Status _createView(OperationContext* opCtx, str::stream() << "Not primary while creating collection " << nss); } - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); if (collectionOptions.changeStreamPreAndPostImagesOptions.getEnabled()) { return Status(ErrorCodes::InvalidOptions, @@ -337,7 +338,8 @@ Status _createTimeseries(OperationContext* opCtx, str::stream() << "Not primary while creating collection " << ns); } - CollectionShardingState::get(opCtx, bucketsNs)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, bucketsNs) + ->checkShardVersionOrThrow(opCtx); WriteUnitOfWork wuow(opCtx); AutoStatsTracker bucketsStatsTracker( @@ -425,7 +427,8 @@ Status _createTimeseries(OperationContext* opCtx, str::stream() << "Not primary while creating collection " << ns}; } - CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, ns) + ->checkShardVersionOrThrow(opCtx); _createSystemDotViewsIfNecessary(opCtx, db); @@ -542,7 +545,8 @@ Status _createCollection( str::stream() << "Not primary while creating collection " << nss); } - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); WriteUnitOfWork wunit(opCtx); diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index 028f2eb644d..407ff29e3a2 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -284,7 +284,8 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx, // Serialize the drop with refreshes to prevent dropping a collection and creating the same // nss as a view while refreshing. - CollectionShardingState::get(opCtx, resolvedNss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, resolvedNss) + ->checkShardVersionOrThrow(opCtx); invariant(coll->getIndexCatalog()->numIndexesInProgress() == 0); diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp index 7ee4327a555..22b55c90e71 100644 --- a/src/mongo/db/catalog/drop_indexes.cpp +++ b/src/mongo/db/catalog/drop_indexes.cpp @@ -294,7 +294,8 @@ void dropReadyIndexes(OperationContext* opCtx, IndexCatalog* indexCatalog = collection->getIndexCatalog(); auto collDescription = - CollectionShardingState::get(opCtx, collection->ns())->getCollectionDescription(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, collection->ns()) + ->getCollectionDescription(opCtx); if (indexNames.front() == "*") { if (collDescription.isSharded() && !forceDropShardKeyIndex) { @@ -366,9 +367,9 @@ void assertNoMovePrimaryInProgress(OperationContext* opCtx, const NamespaceStrin try { auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( opCtx, nss.dbName(), DSSAcquisitionMode::kShared); + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss); - auto css = CollectionShardingState::get(opCtx, nss); - auto collDesc = css->getCollectionDescription(opCtx); + auto collDesc = scopedCss->getCollectionDescription(opCtx); collDesc.throwIfReshardingInProgress(nss); if (!collDesc.isSharded()) { @@ -510,17 +511,17 @@ DropIndexesReply dropIndexes(OperationContext* opCtx, // abort phase, a new identical index was created. auto indexCatalog = collection->getWritableCollection(opCtx)->getIndexCatalog(); for (const auto& indexName : indexNames) { - auto collDescription = - CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx); - - if (collDescription.isSharded()) { + auto collDesc = + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->getCollectionDescription(opCtx); + if (collDesc.isSharded()) { uassert(ErrorCodes::CannotDropShardKeyIndex, "Cannot drop the only compatible index for this collection's shard key", !isLastShardKeyIndex(opCtx, collection->getCollection(), indexCatalog, indexName, - collDescription.getKeyPattern())); + collDesc.getKeyPattern())); } auto desc = diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index 468c5d48fca..d82893ea8cf 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -34,7 +34,6 @@ #include "mongo/db/catalog/collection_uuid_mismatch.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" @@ -296,10 +295,10 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx, // table are consistent with the read request's shardVersion. // // Note: sharding versioning for an operation has no concept of multiple collections. - auto css = CollectionShardingState::getSharedForLockFreeReads(opCtx, _resolvedNss); - css->checkShardVersionOrThrow(opCtx); + auto scopedCss = CollectionShardingState::acquire(opCtx, _resolvedNss); + scopedCss->checkShardVersionOrThrow(opCtx); - auto collDesc = css->getCollectionDescription(opCtx); + auto collDesc = scopedCss->getCollectionDescription(opCtx); if (collDesc.isSharded()) { _coll.setShardKeyPattern(collDesc.getKeyPattern()); } @@ -440,8 +439,8 @@ AutoGetCollectionLockFree::AutoGetCollectionLockFree(OperationContext* opCtx, // operation. The shardVersion will be checked later if the shard filtering metadata is // fetched, ensuring both that the collection description info fetched here and the routing // table are consistent with the read request's shardVersion. - auto css = CollectionShardingState::getSharedForLockFreeReads(opCtx, _collection->ns()); - auto collDesc = css->getCollectionDescription(opCtx); + auto scopedCss = CollectionShardingState::acquire(opCtx, _collection->ns()); + auto collDesc = scopedCss->getCollectionDescription(opCtx); if (collDesc.isSharded()) { _collectionPtr.setShardKeyPattern(collDesc.getKeyPattern()); } diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 194ebe36ff1..c1cd599a6f7 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -27,9 +27,6 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/db/auth/authorization_checks.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" @@ -38,7 +35,6 @@ #include "mongo/db/curop.h" #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/count.h" #include "mongo/db/fle_crud.h" #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/query/collection_query_info.h" @@ -48,19 +44,13 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/view_response_formatter.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/views/resolved_view.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand - namespace mongo { namespace { -using std::string; -using std::stringstream; -using std::unique_ptr; - // Failpoint which causes to hang "count" cmd after acquiring the DB lock. MONGO_FAIL_POINT_DEFINE(hangBeforeCollectionCount); @@ -205,7 +195,7 @@ public: boost::optional<ScopedCollectionFilter> rangePreserver; if (collection.isSharded()) { rangePreserver.emplace( - CollectionShardingState::getSharedForLockFreeReads(opCtx, nss) + CollectionShardingState::acquire(opCtx, nss) ->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup)); @@ -281,7 +271,7 @@ public: boost::optional<ScopedCollectionFilter> rangePreserver; if (collection.isSharded()) { rangePreserver.emplace( - CollectionShardingState::getSharedForLockFreeReads(opCtx, nss) + CollectionShardingState::acquire(opCtx, nss) ->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup)); diff --git a/src/mongo/db/commands/create_indexes_cmd.cpp b/src/mongo/db/commands/create_indexes_cmd.cpp index bad9b865ac1..312962b1ac8 100644 --- a/src/mongo/db/commands/create_indexes_cmd.cpp +++ b/src/mongo/db/commands/create_indexes_cmd.cpp @@ -325,7 +325,9 @@ void assertNoMovePrimaryInProgress(OperationContext* opCtx, const NamespaceStrin Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - auto collDesc = CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx); + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss); + + auto collDesc = scopedCss->getCollectionDescription(opCtx); if (!collDesc.isSharded()) { if (scopedDss->isMovePrimaryInProgress()) { LOGV2(4909200, "assertNoMovePrimaryInProgress", "namespace"_attr = nss.toString()); @@ -505,7 +507,8 @@ CreateIndexesReply runCreateIndexesWithCoordinator(OperationContext* opCtx, ns, MODE_IX, AutoGetCollection::Options{}.expectedUUID(cmd.getCollectionUUID())); - CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, ns) + ->checkShardVersionOrThrow(opCtx); // Before potentially taking an exclusive collection lock, check if all indexes already // exist while holding an intent lock. diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index ae8172b1494..274d51f8d78 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -286,9 +286,8 @@ public: AutoGetCollectionForReadCommand collection(opCtx, nss); - const auto collDesc = - CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx); - + auto collDesc = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->getCollectionDescription(opCtx); if (collDesc.isSharded()) { const ShardKeyPattern shardKeyPattern(collDesc.getKeyPattern()); uassert(ErrorCodes::BadValue, diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 4749e7603a6..52fe43b3a6f 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -220,7 +220,8 @@ void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& nss) << nss.ns(), repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); } void recordStatsForTopCommand(OperationContext* opCtx) { @@ -550,7 +551,6 @@ void CmdFindAndModify::Invocation::doCheckAuthorization(OperationContext* opCtx) void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) { - validate(request()); const BSONObj& cmdObj = request().toBSON(BSONObj() /* commandPassthroughFields */); @@ -586,7 +586,8 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, str::stream() << "database " << dbName << " does not exist", collection.getDb()); - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); const auto exec = uassertStatusOK( getExecutorDelete(opDebug, &collection.getCollection(), &parsedDelete, verbosity)); @@ -610,7 +611,8 @@ void CmdFindAndModify::Invocation::explain(OperationContext* opCtx, str::stream() << "database " << dbName << " does not exist", collection.getDb()); - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); const auto exec = uassertStatusOK( getExecutorUpdate(opDebug, &collection.getCollection(), &parsedUpdate, verbosity)); diff --git a/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp index 63a92b848a5..5ef20f823f0 100644 --- a/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp +++ b/src/mongo/db/commands/internal_rename_if_options_and_indexes_match_cmd.cpp @@ -44,7 +44,9 @@ MONGO_FAIL_POINT_DEFINE(blockBeforeInternalRenameIfOptionsAndIndexesMatch); bool isCollectionSharded(OperationContext* opCtx, const NamespaceString& nss) { AutoGetCollectionForRead lock(opCtx, nss); return opCtx->writesAreReplicated() && - CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx).isSharded(); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->getCollectionDescription(opCtx) + .isSharded(); } /** diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 7c812bd575e..e8da7e6529d 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -1012,7 +1012,8 @@ public: const NamespaceString& bucketsNs) { AutoGetCollectionForRead coll(opCtx, bucketsNs); auto collDesc = - CollectionShardingState::get(opCtx, bucketsNs)->getCollectionDescription(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, bucketsNs) + ->getCollectionDescription(opCtx); if (collDesc.isSharded()) { tassert(6102801, "Sharded time-series buckets collection is missing time-series fields", diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 46d385a5a8d..295b0eea05d 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -27,9 +27,6 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/db/db_raii.h" #include "mongo/db/catalog/catalog_helper.h" @@ -39,20 +36,18 @@ #include "mongo/db/curop.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/storage/snapshot_helper.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage +namespace mongo { +namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeAutoGetShardVersionCheck); MONGO_FAIL_POINT_DEFINE(reachedAutoGetLockFreeShardConsistencyRetry); -namespace mongo { -namespace { - const boost::optional<int> kDoNotChangeProfilingLevel = boost::none; // TODO (SERVER-69813): Get rid of this when ShardServerCatalogCacheLoader will be removed. @@ -139,7 +134,7 @@ bool isSecondaryNssAView(OperationContext* opCtx, const NamespaceString& nss) { * Returns true if 'nss' is sharded. False otherwise. */ bool isSecondaryNssSharded(OperationContext* opCtx, const NamespaceString& nss) { - return CollectionShardingState::getSharedForLockFreeReads(opCtx, nss) + return CollectionShardingState::acquire(opCtx, nss) ->getCollectionDescription(opCtx) .isSharded(); } @@ -822,9 +817,8 @@ AutoGetCollectionForReadCommandBase<AutoGetCollectionForReadType>:: }); if (!_autoCollForRead.getView()) { - auto css = - CollectionShardingState::getSharedForLockFreeReads(opCtx, _autoCollForRead.getNss()); - css->checkShardVersionOrThrow(opCtx); + auto scopedCss = CollectionShardingState::acquire(opCtx, _autoCollForRead.getNss()); + scopedCss->checkShardVersionOrThrow(opCtx); } } @@ -884,7 +878,8 @@ OldClientContext::OldClientContext(OperationContext* opCtx, case dbDelete: // path, so no need to check them here as well break; default: - CollectionShardingState::get(_opCtx, nss)->checkShardVersionOrThrow(_opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(_opCtx, nss) + ->checkShardVersionOrThrow(_opCtx); break; } } diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 6625911054b..63bf1087791 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -163,8 +163,10 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, // Documents coming directly from users should be validated for storage. It is safe to // access the CollectionShardingState in this write context and to throw SSV if the sharding // metadata has not been initialized. - const auto collDesc = CollectionShardingState::get(opCtx(), collection()->ns()) - ->getCollectionDescription(opCtx()); + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx(), collection()->ns()); + auto collDesc = scopedCss->getCollectionDescription(opCtx()); + if (collDesc.isSharded() && !OperationShardingState::isComingFromRouter(opCtx())) { immutablePaths.fillFrom(collDesc.getKeyPatternFields()); } @@ -238,9 +240,10 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, args.stmtIds = request->getStmtIds(); args.update = logObj; if (_isUserInitiatedWrite) { - args.criteria = CollectionShardingState::get(opCtx(), collection()->ns()) - ->getCollectionDescription(opCtx()) - .extractDocumentKey(newObj); + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire( + opCtx(), collection()->ns()); + auto collDesc = scopedCss->getCollectionDescription(opCtx()); + args.criteria = collDesc.extractDocumentKey(newObj); } else { const auto docId = newObj[idFieldName]; args.criteria = docId ? docId.wrap() : newObj; @@ -714,9 +717,10 @@ void UpdateStage::_checkRestrictionsOnUpdatingShardKeyAreNotViolated( } bool UpdateStage::wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, - const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj) { + const auto& collDesc = shardingWriteRouter.getCollDesc(); + auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); if (!reshardingKeyPattern) return false; @@ -746,19 +750,11 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj const Snapshotted<BSONObj>& oldObj) { ShardingWriteRouter shardingWriteRouter( opCtx(), collection()->ns(), Grid::get(opCtx())->catalogCache()); - auto* const css = shardingWriteRouter.getCss(); - - // css can be null when this is a config server. - if (css == nullptr) { - return false; - } - - const auto collDesc = css->getCollectionDescription(opCtx()); // Calling mutablebson::Document::getObject() renders a full copy of the updated document. This // can be expensive for larger documents, so we skip calling it when the collection isn't even // sharded. - if (!collDesc.isSharded()) { + if (!shardingWriteRouter.getCollDesc().isSharded()) { return false; } @@ -766,21 +762,18 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj // It is possible that both the existing and new shard keys are being updated, so we do not want // to short-circuit checking whether either is being modified. - const auto existingShardKeyUpdated = - wasExistingShardKeyUpdated(shardingWriteRouter, collDesc, newObj, oldObj); - const auto reshardingKeyUpdated = - wasReshardingKeyUpdated(shardingWriteRouter, collDesc, newObj, oldObj); + bool existingShardKeyUpdated = wasExistingShardKeyUpdated(shardingWriteRouter, newObj, oldObj); + bool reshardingKeyUpdated = wasReshardingKeyUpdated(shardingWriteRouter, newObj, oldObj); return existingShardKeyUpdated || reshardingKeyUpdated; } bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, - const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj) { - auto* const css = shardingWriteRouter.getCss(); + const auto& collDesc = shardingWriteRouter.getCollDesc(); + const auto& shardKeyPattern = collDesc.getShardKeyPattern(); - const ShardKeyPattern& shardKeyPattern = collDesc.getShardKeyPattern(); auto oldShardKey = shardKeyPattern.extractShardKeyFromDoc(oldObj.value()); auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj); @@ -801,6 +794,7 @@ bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& sharding // At this point we already asserted that the complete shardKey have been specified in the // query, this implies that mongos is not doing a broadcast update and that it attached a // shardVersion to the command. Thus it is safe to call getOwnershipFilter + auto* const css = shardingWriteRouter.getCss(); const auto collFilter = css->getOwnershipFilter( opCtx(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h index a8acea04acc..74885df5a6c 100644 --- a/src/mongo/db/exec/update_stage.h +++ b/src/mongo/db/exec/update_stage.h @@ -187,12 +187,10 @@ private: * returns true. If the update does not change shard key fields, returns false. */ bool wasExistingShardKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, - const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj); bool wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, - const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj); diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index 5e48a76b826..f9b9538a8db 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -130,9 +130,10 @@ void UpsertStage::_performInsert(BSONObj newDocument) { // 'q' field belong to this shard, but those in the 'u' field do not. In this case we need to // throw so that MongoS can target the insert to the correct shard. if (_isUserInitiatedWrite) { - auto* const css = CollectionShardingState::get(opCtx(), collection()->ns()); - if (css->getCollectionDescription(opCtx()).isSharded()) { - const auto collFilter = css->getOwnershipFilter( + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx(), collection()->ns()); + if (scopedCss->getCollectionDescription(opCtx()).isSharded()) { + auto collFilter = scopedCss->getOwnershipFilter( opCtx(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); const ShardKeyPattern& shardKeyPattern = collFilter.getShardKeyPattern(); auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument); @@ -203,7 +204,8 @@ BSONObj UpsertStage::_produceNewDocumentForInsert() { FieldRefSet shardKeyPaths, immutablePaths; if (_isUserInitiatedWrite) { - optCollDesc = CollectionShardingState::get(opCtx(), _params.request->getNamespaceString()) + optCollDesc = CollectionShardingState::assertCollectionLockedAndAcquire( + opCtx(), _params.request->getNamespaceString()) ->getCollectionDescription(opCtx()); // If the collection is sharded, add all fields from the shard key to the 'shardKeyPaths' diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp index 28ffbe96b86..1fa04f729b2 100644 --- a/src/mongo/db/exec/write_stage_common.cpp +++ b/src/mongo/db/exec/write_stage_common.cpp @@ -76,8 +76,9 @@ PreWriteFilter::Action PreWriteFilter::computeAction(const Document& doc) { bool PreWriteFilter::_documentBelongsToMe(const BSONObj& doc) { if (!_shardFilterer) { _shardFilterer = [&] { - const auto css{CollectionShardingState::get(_opCtx, _nss)}; - return std::make_unique<ShardFiltererImpl>(css->getOwnershipFilter( + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(_opCtx, _nss); + return std::make_unique<ShardFiltererImpl>(scopedCss->getOwnershipFilter( _opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup, true /*supportNonVersionedOperations*/)); diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp index 6da862c5d00..84852c7c9fb 100644 --- a/src/mongo/db/index_builds_coordinator.cpp +++ b/src/mongo/db/index_builds_coordinator.cpp @@ -116,7 +116,8 @@ void checkShardKeyRestrictions(OperationContext* opCtx, const BSONObj& newIdxKey) { CollectionCatalog::get(opCtx)->invariantHasExclusiveAccessToCollection(opCtx, nss); - const auto collDesc = CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx); + const auto collDesc = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->getCollectionDescription(opCtx); if (!collDesc.isSharded()) return; @@ -1916,26 +1917,28 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild(OperationContext* opCtx, AutoGetCollection autoColl(opCtx, nssOrUuid, MODE_X); CollectionWriter collection(opCtx, autoColl); - const auto& ns = collection.get()->ns(); - auto css = CollectionShardingState::get(opCtx, ns); + const auto& nss = collection.get()->ns(); - // Disallow index builds on drop-pending namespaces (system.drop.*) if we are primary. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (replCoord->getSettings().usingReplSets() && - replCoord->canAcceptWritesFor(opCtx, nssOrUuid)) { - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "drop-pending collection: " << ns, - !ns.isDropPendingNamespace()); - } + { + // Disallow index builds on drop-pending namespaces (system.drop.*) if we are primary. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getSettings().usingReplSets() && + replCoord->canAcceptWritesFor(opCtx, nssOrUuid)) { + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "drop-pending collection: " << nss, + !nss.isDropPendingNamespace()); + } - // This check is for optimization purposes only as since this lock is released after this, - // and is acquired again when we build the index in _setUpIndexBuild. - css->checkShardVersionOrThrow(opCtx); - css->getCollectionDescription(opCtx).throwIfReshardingInProgress(ns); + // This check is for optimization purposes only as since this lock is released after this, + // and is acquired again when we build the index in _setUpIndexBuild. + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss); + scopedCss->checkShardVersionOrThrow(opCtx); + scopedCss->getCollectionDescription(opCtx).throwIfReshardingInProgress(nss); + } std::vector<BSONObj> filteredSpecs; try { - filteredSpecs = prepareSpecListForCreate(opCtx, collection.get(), ns, specs); + filteredSpecs = prepareSpecListForCreate(opCtx, collection.get(), nss, specs); } catch (const DBException& ex) { return ex.toStatus(); } @@ -1962,7 +1965,7 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild(OperationContext* opCtx, // the catalog update when it uses the timestamp from the startIndexBuild, rather than // the commitIndexBuild, oplog entry. writeConflictRetry( - opCtx, "IndexBuildsCoordinator::_filterSpecsAndRegisterBuild", ns.ns(), [&] { + opCtx, "IndexBuildsCoordinator::_filterSpecsAndRegisterBuild", nss.ns(), [&] { WriteUnitOfWork wuow(opCtx); createIndexesOnEmptyCollection(opCtx, collection, filteredSpecs, false); wuow.commit(); @@ -2001,13 +2004,17 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild AutoGetCollection coll(opCtx, nssOrUuid, MODE_X); CollectionWriter collection(opCtx, coll); - CollectionShardingState::get(opCtx, collection->ns())->checkShardVersionOrThrow(opCtx); + + const auto& nss = collection.get()->ns(); + + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); // We will not have a start timestamp if we are newly a secondary (i.e. we started as // primary but there was a stepdown). We will be unable to timestamp the initial catalog write, // so we must fail the index build. During initial sync, there is no commit timestamp set. auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->canAcceptWritesFor(opCtx, collection->ns()) && + if (!replCoord->canAcceptWritesFor(opCtx, nss) && indexBuildOptions.applicationMode != ApplicationMode::kInitialSync) { uassert(ErrorCodes::NotWritablePrimary, str::stream() << "Replication state changed while setting up the index build: " @@ -2022,7 +2029,7 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild // writes a no-op just to generate an optime. onInitFn = [&](std::vector<BSONObj>& specs) { if (!(replCoord->getSettings().usingReplSets() && - replCoord->canAcceptWritesFor(opCtx, collection->ns()))) { + replCoord->canAcceptWritesFor(opCtx, nss))) { // Not primary. return Status::OK(); } @@ -2052,7 +2059,7 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild opCtx->getServiceContext()->getOpObserver()->onStartIndexBuild( opCtx, - collection->ns(), + nss, replState->collectionUUID, replState->buildUUID, replState->indexSpecs, @@ -2066,8 +2073,7 @@ IndexBuildsCoordinator::PostSetupAction IndexBuildsCoordinator::_setUpIndexBuild IndexBuildsManager::SetupOptions options; options.indexConstraints = - repl::ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, - collection->ns()) + repl::ReplicationCoordinator::get(opCtx)->shouldRelaxIndexConstraints(opCtx, nss) ? IndexBuildsManager::IndexConstraints::kRelax : IndexBuildsManager::IndexConstraints::kEnforce; options.protocol = replState->protocol; diff --git a/src/mongo/db/op_observer/op_observer_util.cpp b/src/mongo/db/op_observer/op_observer_util.cpp index 98b015dc0b3..21aae9284bc 100644 --- a/src/mongo/db/op_observer/op_observer_util.cpp +++ b/src/mongo/db/op_observer/op_observer_util.cpp @@ -109,7 +109,8 @@ DocumentKey getDocumentKey(OperationContext* opCtx, // if running on standalone or primary. Skip this completely on secondaries since they are // not expected to have the collection metadata cached. if (opCtx->writesAreReplicated()) { - auto collDesc = CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx); + auto collDesc = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->getCollectionDescription(opCtx); if (collDesc.isSharded()) { shardKey = dotted_path_support::extractElementsBasedOnTemplate(doc, collDesc.getKeyPattern()) diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index a3ba4c3c6e3..07a7e7bd7a7 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -224,7 +224,8 @@ void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& nss) repl::ReplicationCoordinator::get(opCtx->getServiceContext()) ->canAcceptWritesFor(opCtx, nss)); - CollectionShardingState::get(opCtx, nss)->checkShardVersionOrThrow(opCtx); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->checkShardVersionOrThrow(opCtx); } void makeCollection(OperationContext* opCtx, const NamespaceString& ns) { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 04826cbaa78..9b49dc3659e 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -27,17 +27,11 @@ * it in the license file. */ -#include "mongo/db/query/projection_parser.h" - -#include "mongo/platform/basic.h" - #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/base/exact_cast.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/curop.h" @@ -84,6 +78,7 @@ #include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/planner_analysis.h" +#include "mongo/db/query/projection_parser.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/query_planner.h" @@ -103,7 +98,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery - namespace mongo { using boost::intrusive_ptr; @@ -532,8 +526,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRan TrialStage* trialStage = nullptr; - auto css = CollectionShardingState::get(opCtx, coll->ns()); - const auto isSharded = css->getCollectionDescription(opCtx).isSharded(); + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, coll->ns()); + const bool isSharded = scopedCss->getCollectionDescription(opCtx).isSharded(); // Because 'numRecords' includes orphan documents, our initial decision to optimize the $sample // cursor may have been mistaken. For sharded collections, build a TRIAL plan that will switch @@ -587,7 +581,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRan if (isSharded) { // In the sharded case, we need to use a ShardFilterer within the ARHASH plan to // eliminate orphans from the working set, since the stage owns the cursor. - maybeShardFilter = std::make_unique<ShardFiltererImpl>(css->getOwnershipFilter( + maybeShardFilter = std::make_unique<ShardFiltererImpl>(scopedCss->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup)); } @@ -610,7 +604,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRan if (isSharded) { // In the sharded case, we need to add a shard-filterer stage to the backup plan to // eliminate orphans. The trial plan is thus SHARDING_FILTER-COLLSCAN. - auto collectionFilter = css->getOwnershipFilter( + auto collectionFilter = scopedCss->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup); collScanPlan = std::make_unique<ShardFilterStage>( expCtx.get(), std::move(collectionFilter), ws.get(), std::move(collScanPlan)); @@ -652,7 +646,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRan // Since the incoming operation is sharded, use the CSS to infer the filtering metadata for // the collection. We get the shard ownership filter after checking to see if the collection // is sharded to avoid an invariant from being fired in this call. - auto collectionFilter = css->getOwnershipFilter( + auto collectionFilter = scopedCss->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup); // The trial plan is SHARDING_FILTER-MULTI_ITERATOR. auto randomCursorPlan = std::make_unique<ShardFilterStage>( diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 77662b6d276..8d65c41709e 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -166,7 +166,7 @@ BSONObj ShardServerProcessInterface::preparePipelineAndExplain( std::unique_ptr<ShardFilterer> ShardServerProcessInterface::getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const { auto collectionFilter = - CollectionShardingState::get(expCtx->opCtx, expCtx->ns) + CollectionShardingState::assertCollectionLockedAndAcquire(expCtx->opCtx, expCtx->ns) ->getOwnershipFilter( expCtx->opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup); diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp index 4404e2ab6da..a11db550e04 100644 --- a/src/mongo/db/query/classic_stage_builder.cpp +++ b/src/mongo/db/query/classic_stage_builder.cpp @@ -322,10 +322,11 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r const ShardingFilterNode* fn = static_cast<const ShardingFilterNode*>(root); auto childStage = build(fn->children[0].get()); - auto css = CollectionShardingState::get(_opCtx, _collection->ns()); + auto scopedCss = CollectionShardingState::assertCollectionLockedAndAcquire( + _opCtx, _collection->ns()); return std::make_unique<ShardFilterStage>( expCtx, - css->getOwnershipFilter( + scopedCss->getOwnershipFilter( _opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup), _ws, std::move(childStage)); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 195f4dadbea..0824723b4d6 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -861,7 +861,7 @@ protected: if (_plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { stage = std::make_unique<ShardFilterStage>( _cq->getExpCtxRaw(), - CollectionShardingState::get(_opCtx, _cq->nss()) + CollectionShardingState::assertCollectionLockedAndAcquire(_opCtx, _cq->nss()) ->getOwnershipFilter( _opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup), diff --git a/src/mongo/db/query/shard_filterer_factory_impl.cpp b/src/mongo/db/query/shard_filterer_factory_impl.cpp index f3d7c1d5945..74b53053de2 100644 --- a/src/mongo/db/query/shard_filterer_factory_impl.cpp +++ b/src/mongo/db/query/shard_filterer_factory_impl.cpp @@ -36,8 +36,9 @@ namespace mongo { std::unique_ptr<ShardFilterer> ShardFiltererFactoryImpl::makeShardFilterer( OperationContext* opCtx) const { - auto css = CollectionShardingState::get(opCtx, _collection->ns()); - return std::make_unique<ShardFiltererImpl>(css->getOwnershipFilter( + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, _collection->ns()); + return std::make_unique<ShardFiltererImpl>(scopedCss->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kDisallowOrphanCleanup)); } diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 227e0148478..b5feef2cd1b 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -228,10 +228,10 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex if (nss) { // Lock the collection so nothing changes while we're getting the migration report. AutoGetCollection autoColl(opCtx, nss.value(), MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss.value()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss.value(), CSRAcquisitionMode::kShared); - if (auto msm = MigrationSourceManager::get(csr, csrLock)) { + if (auto msm = MigrationSourceManager::get(*scopedCsr)) { return msm->getMigrationStatusReport(); } } diff --git a/src/mongo/db/s/chunk_operation_precondition_checks.cpp b/src/mongo/db/s/chunk_operation_precondition_checks.cpp index 76266aef3c7..2eb6f6836a9 100644 --- a/src/mongo/db/s/chunk_operation_precondition_checks.cpp +++ b/src/mongo/db/s/chunk_operation_precondition_checks.cpp @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #include "mongo/db/catalog_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" @@ -40,9 +41,9 @@ CollectionMetadata checkCollectionIdentity(OperationContext* opCtx, AutoGetCollection collection(opCtx, nss, MODE_IS); const auto shardId = ShardingState::get(opCtx)->shardId(); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - auto optMetadata = csr->getCurrentMetadataIfKnown(); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + auto optMetadata = scopedCsr->getCurrentMetadataIfKnown(); uassert(StaleConfigInfo(nss, ShardVersion::IGNORED() /* receivedVersion */, diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index 2a122a284bd..f25bce6c50d 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -34,7 +34,6 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" -#include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" @@ -77,8 +76,9 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx, } collectionUuid.emplace(autoColl.getCollection()->uuid()); - auto* const csr = CollectionShardingRuntime::get(opCtx, ns); - const auto optCollDescr = csr->getCurrentMetadataIfKnown(); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, ns, CSRAcquisitionMode::kShared); + auto optCollDescr = scopedCsr->getCurrentMetadataIfKnown(); if (!optCollDescr || !optCollDescr->isSharded()) { LOGV2(4416001, "cleanupOrphaned skipping waiting for orphaned data cleanup because " diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index 1bdd899d5fb..1ec5b6be31f 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -116,8 +116,10 @@ protected: { AutoGetCollection autoColl(operationContext(), kNss, MODE_X); - CollectionShardingRuntime::get(operationContext(), kNss) - ->setFilteringMetadata(operationContext(), CollectionMetadata(cm, ShardId("0"))); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + operationContext(), kNss, CSRAcquisitionMode::kExclusive); + scopedCsr->setFilteringMetadata(operationContext(), + CollectionMetadata(cm, ShardId("0"))); } _manager = std::make_shared<MetadataManager>( @@ -154,8 +156,9 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsInTheFuture) { ShardVersion(metadata.getShardVersion(), boost::optional<CollectionIndexes>(boost::none)) /* shardVersion */, boost::none /* databaseVersion */}; - auto* const css = CollectionShardingState::get(operationContext(), kNss); - testFilterFn(css->getOwnershipFilter( + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(operationContext(), kNss); + testFilterFn(scopedCss->getOwnershipFilter( operationContext(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup)); } @@ -184,8 +187,9 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsInThePast) { ShardVersion(metadata.getShardVersion(), boost::optional<CollectionIndexes>(boost::none)) /* shardVersion */, boost::none /* databaseVersion */}; - auto* const css = CollectionShardingState::get(operationContext(), kNss); - testFilterFn(css->getOwnershipFilter( + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(operationContext(), kNss); + testFilterFn(scopedCss->getOwnershipFilter( operationContext(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup)); } @@ -222,8 +226,9 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsTooFarInThePastThrowsStal ShardVersion(metadata.getShardVersion(), boost::optional<CollectionIndexes>(boost::none)) /* shardVersion */, boost::none /* databaseVersion */}; - auto* const css = CollectionShardingState::get(operationContext(), kNss); - testFilterFn(css->getOwnershipFilter( + auto scopedCss = + CollectionShardingState::assertCollectionLockedAndAcquire(operationContext(), kNss); + testFilterFn(scopedCss->getOwnershipFilter( operationContext(), CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup)); } diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index a856a187524..89ceb0cfd4c 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -29,7 +29,6 @@ #include "mongo/db/s/collection_sharding_runtime.h" -#include "mongo/base/checked_cast.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/global_settings.h" #include "mongo/db/s/operation_sharding_state.h" @@ -74,6 +73,10 @@ boost::optional<ShardVersion> getOperationReceivedVersion(OperationContext* opCt } // namespace +CollectionShardingRuntime::ScopedCollectionShardingRuntime::ScopedCollectionShardingRuntime( + ScopedCollectionShardingState&& scopedCss) + : _scopedCss(std::move(scopedCss)) {} + CollectionShardingRuntime::CollectionShardingRuntime( ServiceContext* service, NamespaceString nss, @@ -81,18 +84,17 @@ CollectionShardingRuntime::CollectionShardingRuntime( : _serviceContext(service), _nss(std::move(nss)), _rangeDeleterExecutor(std::move(rangeDeleterExecutor)), - _stateChangeMutex(_nss.toString()), _metadataType(_nss.isNamespaceAlwaysUnsharded() ? MetadataType::kUnsharded : MetadataType::kUnknown) {} -CollectionShardingRuntime* CollectionShardingRuntime::get(OperationContext* opCtx, - const NamespaceString& nss) { - auto* const css = CollectionShardingState::get(opCtx, nss); - return checked_cast<CollectionShardingRuntime*>(css); -} - -CollectionShardingRuntime* CollectionShardingRuntime::get(CollectionShardingState* css) { - return checked_cast<CollectionShardingRuntime*>(css); +CollectionShardingRuntime::ScopedCollectionShardingRuntime +CollectionShardingRuntime::assertCollectionLockedAndAcquire(OperationContext* opCtx, + const NamespaceString& nss, + CSRAcquisitionMode mode) { + dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS)); + return ScopedCollectionShardingRuntime( + ScopedCollectionShardingState::acquireScopedCollectionShardingState( + opCtx, nss, mode == CSRAcquisitionMode::kShared ? MODE_IS : MODE_X)); } ScopedCollectionFilter CollectionShardingRuntime::getOwnershipFilter( @@ -161,8 +163,7 @@ void CollectionShardingRuntime::checkShardVersionOrThrow(OperationContext* opCtx (void)_getMetadataWithVersionCheckAt(opCtx, boost::none); } -void CollectionShardingRuntime::enterCriticalSectionCatchUpPhase(const CSRLock&, - const BSONObj& reason) { +void CollectionShardingRuntime::enterCriticalSectionCatchUpPhase(const BSONObj& reason) { _critSec.enterCriticalSectionCatchUpPhase(reason); if (_shardVersionInRecoverOrRefresh) { @@ -170,39 +171,30 @@ void CollectionShardingRuntime::enterCriticalSectionCatchUpPhase(const CSRLock&, } } -void CollectionShardingRuntime::enterCriticalSectionCommitPhase(const CSRLock&, - const BSONObj& reason) { +void CollectionShardingRuntime::enterCriticalSectionCommitPhase(const BSONObj& reason) { _critSec.enterCriticalSectionCommitPhase(reason); } void CollectionShardingRuntime::rollbackCriticalSectionCommitPhaseToCatchUpPhase( - const CSRLock&, const BSONObj& reason) { + const BSONObj& reason) { _critSec.rollbackCriticalSectionCommitPhaseToCatchUpPhase(reason); } -void CollectionShardingRuntime::exitCriticalSection(const CSRLock&, const BSONObj& reason) { +void CollectionShardingRuntime::exitCriticalSection(const BSONObj& reason) { _critSec.exitCriticalSection(reason); } -void CollectionShardingRuntime::exitCriticalSectionNoChecks(const CSRLock&) { +void CollectionShardingRuntime::exitCriticalSectionNoChecks() { _critSec.exitCriticalSectionNoChecks(); } boost::optional<SharedSemiFuture<void>> CollectionShardingRuntime::getCriticalSectionSignal( OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op) { - auto csrLock = CSRLock::lockShared(opCtx, this); return _critSec.getSignal(op); } void CollectionShardingRuntime::setFilteringMetadata(OperationContext* opCtx, CollectionMetadata newMetadata) { - const auto csrLock = CSRLock::lockExclusive(opCtx, this); - setFilteringMetadata_withLock(opCtx, newMetadata, csrLock); -} - -void CollectionShardingRuntime::setFilteringMetadata_withLock(OperationContext* opCtx, - CollectionMetadata newMetadata, - const CSRLock& csrExclusiveLock) { invariant(!newMetadata.isSharded() || !_nss.isNamespaceAlwaysUnsharded(), str::stream() << "Namespace " << _nss.ns() << " must never be sharded."); @@ -231,7 +223,6 @@ void CollectionShardingRuntime::setFilteringMetadata_withLock(OperationContext* void CollectionShardingRuntime::_clearFilteringMetadata(OperationContext* opCtx, bool clearMetadataManager) { - const auto csrLock = CSRLock::lockExclusive(opCtx, this); if (_shardVersionInRecoverOrRefresh) { _shardVersionInRecoverOrRefresh->cancellationSource.cancel(); } @@ -280,7 +271,8 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, const StatusWith<SharedSemiFuture<void>> swOrphanCleanupFuture = [&]() -> StatusWith<SharedSemiFuture<void>> { AutoGetCollection autoColl(opCtx, nss, MODE_IX); - auto* const self = CollectionShardingRuntime::get(opCtx, nss); + auto self = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); stdx::lock_guard lk(self->_metadataManagerLock); // If the metadata was reset, or the collection was dropped and recreated since the @@ -393,8 +385,6 @@ CollectionShardingRuntime::_getMetadataWithVersionCheckAt( const auto& receivedShardVersion = optReceivedShardVersion ? *optReceivedShardVersion : ShardVersion::IGNORED(); - auto csrLock = CSRLock::lockShared(opCtx, this); - { auto criticalSectionSignal = _critSec.getSignal( opCtx->lockState()->isWriteLocked() ? ShardingMigrationCriticalSection::kWrite @@ -479,26 +469,24 @@ size_t CollectionShardingRuntime::numberOfRangesScheduledForDeletion() const { void CollectionShardingRuntime::setShardVersionRecoverRefreshFuture( - SharedSemiFuture<void> future, CancellationSource cancellationSource, const CSRLock&) { + SharedSemiFuture<void> future, CancellationSource cancellationSource) { invariant(!_shardVersionInRecoverOrRefresh); _shardVersionInRecoverOrRefresh.emplace(std::move(future), std::move(cancellationSource)); } boost::optional<SharedSemiFuture<void>> CollectionShardingRuntime::getShardVersionRecoverRefreshFuture(OperationContext* opCtx) { - auto csrLock = CSRLock::lockShared(opCtx, this); return _shardVersionInRecoverOrRefresh ? boost::optional<SharedSemiFuture<void>>(_shardVersionInRecoverOrRefresh->future) : boost::none; } -void CollectionShardingRuntime::resetShardVersionRecoverRefreshFuture(const CSRLock&) { +void CollectionShardingRuntime::resetShardVersionRecoverRefreshFuture() { invariant(_shardVersionInRecoverOrRefresh); _shardVersionInRecoverOrRefresh = boost::none; } boost::optional<Timestamp> CollectionShardingRuntime::getIndexVersion(OperationContext* opCtx) { - auto csrLock = CSRLock::lockShared(opCtx, this); return _globalIndexesInfo ? _globalIndexesInfo->getVersion() : boost::none; } @@ -510,7 +498,6 @@ boost::optional<GlobalIndexesCache>& CollectionShardingRuntime::getIndexes( void CollectionShardingRuntime::addIndex(OperationContext* opCtx, const IndexCatalogType& index, const Timestamp& indexVersion) { - auto csrLock = CSRLock::lockExclusive(opCtx, this); if (_globalIndexesInfo) { _globalIndexesInfo->add(index, indexVersion); } else { @@ -523,14 +510,12 @@ void CollectionShardingRuntime::addIndex(OperationContext* opCtx, void CollectionShardingRuntime::removeIndex(OperationContext* opCtx, const std::string& name, const Timestamp& indexVersion) { - auto csrLock = CSRLock::lockExclusive(opCtx, this); tassert( 7019500, "Index information does not exist on CSR", _globalIndexesInfo.is_initialized()); _globalIndexesInfo->remove(name, indexVersion); } void CollectionShardingRuntime::clearIndexes(OperationContext* opCtx) { - auto csrLock = CSRLock::lockExclusive(opCtx, this); _globalIndexesInfo = boost::none; } @@ -546,18 +531,18 @@ CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx, AutoGetCollection::Options{}.deadline( _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load()))); - auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - invariant(csr->getCurrentMetadataIfKnown()); - csr->enterCriticalSectionCatchUpPhase(csrLock, _reason); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, _nss, CSRAcquisitionMode::kExclusive); + invariant(scopedCsr->getCurrentMetadataIfKnown()); + scopedCsr->enterCriticalSectionCatchUpPhase(_reason); } CollectionCriticalSection::~CollectionCriticalSection() { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, _nss, MODE_IX); - auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - csr->exitCriticalSection(csrLock, _reason); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, _nss, CSRAcquisitionMode::kExclusive); + scopedCsr->exitCriticalSection(_reason); } void CollectionCriticalSection::enterCommitPhase() { @@ -567,10 +552,10 @@ void CollectionCriticalSection::enterCommitPhase() { AutoGetCollection::Options{}.deadline( _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load()))); - auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - invariant(csr->getCurrentMetadataIfKnown()); - csr->enterCriticalSectionCommitPhase(csrLock, _reason); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, _nss, CSRAcquisitionMode::kExclusive); + invariant(scopedCsr->getCurrentMetadataIfKnown()); + scopedCsr->enterCriticalSectionCommitPhase(_reason); } } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index d1f9c6d603d..44e46d0bc4a 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -33,13 +33,14 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_migration_critical_section.h" -#include "mongo/db/s/sharding_state_lock.h" #include "mongo/s/global_index_cache.h" #include "mongo/util/cancellation.h" #include "mongo/util/decorable.h" namespace mongo { +enum class CSRAcquisitionMode { kShared, kExclusive }; + /** * See the comments for CollectionShardingState for more information on how this class fits in the * sharding architecture. @@ -54,22 +55,34 @@ public: NamespaceString nss, std::shared_ptr<executor::TaskExecutor> rangeDeleterExecutor); - using CSRLock = ShardingStateLock<CollectionShardingRuntime>; - /** - * Obtains the sharding runtime state for the specified collection. If it does not exist, it - * will be created and will remain active until the collection is dropped or unsharded. - * - * Must be called with some lock held on the specific collection being looked up and the - * returned pointer should never be stored. + * Obtains the sharding runtime for the specified collection, along with a resource lock + * protecting it from concurrent modifications, which will be held until the object goes out of + * scope. */ - static CollectionShardingRuntime* get(OperationContext* opCtx, const NamespaceString& nss); + class ScopedCollectionShardingRuntime { + public: + ScopedCollectionShardingRuntime(ScopedCollectionShardingRuntime&&) = default; - /** - * Obtains the sharding runtime state from the the specified sharding collection state. The - * returned pointer should never be stored. - */ - static CollectionShardingRuntime* get(CollectionShardingState* css); + CollectionShardingRuntime* operator->() const { + return checked_cast<CollectionShardingRuntime*>(&*_scopedCss); + } + CollectionShardingRuntime& operator*() const { + return checked_cast<CollectionShardingRuntime&>(*_scopedCss); + } + + private: + friend class CollectionShardingRuntime; + + ScopedCollectionShardingRuntime(ScopedCollectionShardingState&& scopedCss); + + ScopedCollectionShardingState _scopedCss; + }; + static ScopedCollectionShardingRuntime assertCollectionLockedAndAcquire( + OperationContext* opCtx, const NamespaceString& nss, CSRAcquisitionMode mode); + static ScopedCollectionShardingState acquire(OperationContext* opCtx, + const NamespaceString& nss, + CSRAcquisitionMode mode) = delete; const NamespaceString& nss() const override { return _nss; @@ -107,10 +120,6 @@ public: */ void setFilteringMetadata(OperationContext* opCtx, CollectionMetadata newMetadata); - void setFilteringMetadata_withLock(OperationContext* opCtx, - CollectionMetadata newMetadata, - const CSRLock& csrExclusiveLock); - /** * Marks the collection's filtering metadata as UNKNOWN, meaning that all attempts to check for * shard version match will fail with StaleConfig errors in order to trigger an update. @@ -135,33 +144,29 @@ public: * * Entering into the Critical Section interrupts any ongoing filtering metadata refresh. */ - void enterCriticalSectionCatchUpPhase(const CSRLock&, const BSONObj& reason); - void enterCriticalSectionCommitPhase(const CSRLock&, const BSONObj& reason); + void enterCriticalSectionCatchUpPhase(const BSONObj& reason); + void enterCriticalSectionCommitPhase(const BSONObj& reason); /** * It transitions the critical section back to the catch up phase. */ - void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const CSRLock&, const BSONObj& reason); + void rollbackCriticalSectionCommitPhaseToCatchUpPhase(const BSONObj& reason); /** - * Method to control the collection's critical secion. Method listed below must be called with - * the CSRLock in exclusive mode. - * - * In this method, the CSRLock ensures concurrent access to the critical section. + * Method to control the collection's critical section. Methods listed below must be called with + * both the collection lock and CSR acquired in exclusive mode. */ - void exitCriticalSection(const CSRLock&, const BSONObj& reason); + void exitCriticalSection(const BSONObj& reason); /** * Same semantics than 'exitCriticalSection' but without doing error-checking. Only meant to be * used when recovering the critical sections in the RecoverableCriticalSectionService. */ - void exitCriticalSectionNoChecks(const CSRLock&); + void exitCriticalSectionNoChecks(); /** * If the collection is currently in a critical section, returns the critical section signal to * be waited on. Otherwise, returns nullptr. - * - * This method internally acquires the CSRLock in MODE_IS. */ boost::optional<SharedSemiFuture<void>> getCriticalSectionSignal( OperationContext* opCtx, ShardingMigrationCriticalSection::Operation op); @@ -204,29 +209,22 @@ public: * Initializes the shard version recover/refresh shared semifuture for other threads to wait on * it. * - * In this method, the CSRLock ensures concurrent access to the shared semifuture. - * * To invoke this method, the criticalSectionSignal must not be hold by a different thread. */ void setShardVersionRecoverRefreshFuture(SharedSemiFuture<void> future, - CancellationSource cancellationSource, - const CSRLock&); + CancellationSource cancellationSource); /** * If there an ongoing shard version recover/refresh, it returns the shared semifuture to be * waited on. Otherwise, returns boost::none. - * - * This method internally acquires the CSRLock in MODE_IS. */ boost::optional<SharedSemiFuture<void>> getShardVersionRecoverRefreshFuture( OperationContext* opCtx); /** * Resets the shard version recover/refresh shared semifuture to boost::none. - * - * In this method, the CSRLock ensures concurrent access to the shared semifuture. */ - void resetShardVersionRecoverRefreshFuture(const CSRLock&); + void resetShardVersionRecoverRefreshFuture(); /** * Gets an index version under a lock. @@ -258,8 +256,6 @@ public: void clearIndexes(OperationContext* opCtx); private: - friend CSRLock; - struct ShardVersionRecoverOrRefresh { public: ShardVersionRecoverOrRefresh(SharedSemiFuture<void> future, @@ -304,13 +300,7 @@ private: // The executor used for deleting ranges of orphan chunks. std::shared_ptr<executor::TaskExecutor> _rangeDeleterExecutor; - // Object-wide ResourceMutex to protect changes to the CollectionShardingRuntime or objects held - // within (including the MigrationSourceManager, which is a decoration on the CSR). Use only the - // CSRLock to lock this mutex. - Lock::ResourceMutex _stateChangeMutex; - // Tracks the migration critical section state for this collection. - // Must hold CSRLock while accessing. ShardingMigrationCriticalSection _critSec; // Protects state around the metadata manager below diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index 0c6d0cd74d6..6e792a8db0e 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -391,12 +391,10 @@ public: CollectionShardingRuntimeTest::tearDown(); } - // Creates the CSR if it does not exist and stashes it in the CollectionShardingStateMap. This - // is required for waitForClean tests which use CollectionShardingRuntime::get(). - CollectionShardingRuntime& csr() { + CollectionShardingRuntime::ScopedCollectionShardingRuntime csr() { AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); - auto* css = CollectionShardingState::get(operationContext(), kTestNss); - return *checked_cast<CollectionShardingRuntime*>(css); + return CollectionShardingRuntime::assertCollectionLockedAndAcquire( + operationContext(), kTestNss, CSRAcquisitionMode::kShared); } const UUID& uuid() const { @@ -442,7 +440,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, WaitForCleanReturnsErrorIfCollectionUUIDDoesNotMatchFilteringMetadata) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); auto randomUuid = UUID::gen(); auto status = CollectionShardingRuntime::waitForClean( @@ -458,7 +456,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, WaitForCleanReturnsOKIfNoDeletionsAreScheduled) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); auto status = CollectionShardingRuntime::waitForClean( opCtx, @@ -480,7 +478,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); @@ -501,7 +499,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, WaitForCleanBlocksBehindAllScheduledDeletions) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); const auto middleKey = 5; const ChunkRange range1 = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << middleKey)); @@ -535,7 +533,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, WaitForCleanReturnsOKAfterSuccessfulDeletion) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); @@ -557,7 +555,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); @@ -566,8 +564,8 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, opCtx, task, SemiFuture<void>::makeReady() /* waitForActiveQueries */); // Clear and set again filtering metadata - csr().clearFilteringMetadata(opCtx); - csr().setFilteringMetadata(opCtx, metadata); + csr()->clearFilteringMetadata(opCtx); + csr()->setFilteringMetadata(opCtx, metadata); auto waitForCleanUp = [&](Date_t timeout) { return CollectionShardingRuntime::waitForClean(opCtx, kTestNss, uuid(), range, timeout); @@ -606,15 +604,15 @@ public: TEST_F(CollectionShardingRuntimeWithCatalogTest, TestGlobalIndexesCache) { OperationContext* opCtx = operationContext(); - ASSERT_EQ(false, csr().getIndexes(opCtx).is_initialized()); + ASSERT_EQ(false, csr()->getIndexes(opCtx).is_initialized()); Timestamp indexVersion(1, 0); addGlobalIndexCatalogEntryToCollection( opCtx, kTestNss, "x_1", BSON("x" << 1), BSONObj(), uuid(), indexVersion, boost::none); - ASSERT_EQ(true, csr().getIndexes(opCtx).is_initialized()); - ASSERT_EQ(indexVersion, *csr().getIndexes(opCtx)->getVersion()); - ASSERT_EQ(indexVersion, *csr().getIndexVersion(opCtx)); + ASSERT_EQ(true, csr()->getIndexes(opCtx).is_initialized()); + ASSERT_EQ(indexVersion, *csr()->getIndexes(opCtx)->getVersion()); + ASSERT_EQ(indexVersion, *csr()->getIndexVersion(opCtx)); } } // namespace } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index cf8fed2816e..68042189e77 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -48,6 +48,14 @@ public: CollectionShardingStateMap(std::unique_ptr<CollectionShardingStateFactory> factory) : _factory(std::move(factory)) {} + struct CSSAndLock { + CSSAndLock(std::unique_ptr<CollectionShardingState> css) + : cssMutex("CSSMutex::" + css->nss().toString()), css(std::move(css)) {} + + const Lock::ResourceMutex cssMutex; + std::unique_ptr<CollectionShardingState> css; + }; + /** * Joins the factory, waiting for any outstanding tasks using the factory to be finished. Must * be called before destruction. @@ -56,17 +64,18 @@ public: _factory->join(); } - std::shared_ptr<CollectionShardingState> getOrCreate(const NamespaceString& nss) { + CSSAndLock* getOrCreate(const NamespaceString& nss) noexcept { stdx::lock_guard<Latch> lg(_mutex); auto it = _collections.find(nss.ns()); if (it == _collections.end()) { - auto inserted = _collections.try_emplace(nss.ns(), _factory->make(nss)); + auto inserted = _collections.try_emplace( + nss.ns(), std::make_unique<CSSAndLock>(_factory->make(nss))); invariant(inserted.second); it = std::move(inserted.first); } - return it->second; + return it->second.get(); } void appendInfoForShardingStateCommand(BSONObjBuilder* builder) { @@ -75,7 +84,7 @@ public: { stdx::lock_guard<Latch> lg(_mutex); for (const auto& coll : _collections) { - coll.second->appendShardVersion(builder); + coll.second->css->appendShardVersion(builder); } } @@ -86,13 +95,13 @@ public: if (!mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { auto totalNumberOfRangesScheduledForDeletion = ([this] { stdx::lock_guard lg(_mutex); - return std::accumulate(_collections.begin(), - _collections.end(), - 0LL, - [](long long total, const auto& coll) { - return total + - coll.second->numberOfRangesScheduledForDeletion(); - }); + return std::accumulate( + _collections.begin(), + _collections.end(), + 0LL, + [](long long total, const auto& coll) { + return total + coll.second->css->numberOfRangesScheduledForDeletion(); + }); })(); builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion); @@ -110,11 +119,13 @@ public: } private: - using CollectionsMap = StringMap<std::shared_ptr<CollectionShardingState>>; - std::unique_ptr<CollectionShardingStateFactory> _factory; Mutex _mutex = MONGO_MAKE_LATCH("CollectionShardingStateMap::_mutex"); + + // Entries of the _collections map must never be deleted or replaced. This is to guarantee that + // a 'nss' is always associated to the same 'ResourceMutex'. + using CollectionsMap = StringMap<std::unique_ptr<CSSAndLock>>; CollectionsMap _collections; }; @@ -124,19 +135,42 @@ const ServiceContext::Decoration<boost::optional<CollectionShardingStateMap>> } // namespace -CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx, - const NamespaceString& nss) { - // Collection lock must be held to have a reference to the collection's sharding state +CollectionShardingState::ScopedCollectionShardingState::ScopedCollectionShardingState( + Lock::ResourceLock lock, CollectionShardingState* css) + : _lock(std::move(lock)), _css(css) {} + +CollectionShardingState::ScopedCollectionShardingState::ScopedCollectionShardingState( + ScopedCollectionShardingState&& other) + : _lock(std::move(other._lock)), _css(other._css) { + other._css = nullptr; +} + +CollectionShardingState::ScopedCollectionShardingState::~ScopedCollectionShardingState() = default; + +CollectionShardingState::ScopedCollectionShardingState +CollectionShardingState::ScopedCollectionShardingState::acquireScopedCollectionShardingState( + OperationContext* opCtx, const NamespaceString& nss, LockMode mode) { + CollectionShardingStateMap::CSSAndLock* cssAndLock = + CollectionShardingStateMap::get(opCtx->getServiceContext())->getOrCreate(nss); + + // First lock the RESOURCE_MUTEX associated to this nss to guarantee stability of the + // CollectionShardingState* . After that, it is safe to get and store the + // CollectionShadingState*, as long as the RESOURCE_MUTEX is kept locked. + Lock::ResourceLock lock(opCtx->lockState(), cssAndLock->cssMutex.getRid(), mode); + return ScopedCollectionShardingState(std::move(lock), cssAndLock->css.get()); +} + +CollectionShardingState::ScopedCollectionShardingState +CollectionShardingState::assertCollectionLockedAndAcquire(OperationContext* opCtx, + const NamespaceString& nss) { dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS)); - auto& collectionsMap = CollectionShardingStateMap::get(opCtx->getServiceContext()); - return collectionsMap->getOrCreate(nss).get(); + return acquire(opCtx, nss); } -std::shared_ptr<CollectionShardingState> CollectionShardingState::getSharedForLockFreeReads( +CollectionShardingState::ScopedCollectionShardingState CollectionShardingState::acquire( OperationContext* opCtx, const NamespaceString& nss) { - auto& collectionsMap = CollectionShardingStateMap::get(opCtx->getServiceContext()); - return collectionsMap->getOrCreate(nss); + return ScopedCollectionShardingState::acquireScopedCollectionShardingState(opCtx, nss, MODE_IS); } void CollectionShardingState::appendInfoForShardingStateCommand(OperationContext* opCtx, diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index bc68c9417d1..5ae72e1cda4 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -64,13 +64,39 @@ public: CollectionShardingState& operator=(const CollectionShardingState&) = delete; /** - * Obtains the sharding state for the specified collection. If it does not exist, it will be - * created and will remain in memory until the collection is dropped. - * - * Must be called with some lock held on the specific collection being looked up and the - * returned pointer must not be stored. + * Obtains the sharding state for the specified collection, along with a resource lock + * protecting it from concurrent modifications, which will be held util the object goes out of + * scope. */ - static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss); + class ScopedCollectionShardingState { + public: + ScopedCollectionShardingState(ScopedCollectionShardingState&&); + + ~ScopedCollectionShardingState(); + + CollectionShardingState* operator->() const { + return _css; + } + CollectionShardingState& operator*() const { + return *_css; + } + + private: + friend class CollectionShardingState; + friend class CollectionShardingRuntime; + + ScopedCollectionShardingState(Lock::ResourceLock lock, CollectionShardingState* css); + + static ScopedCollectionShardingState acquireScopedCollectionShardingState( + OperationContext* opCtx, const NamespaceString& nss, LockMode mode); + + Lock::ResourceLock _lock; + CollectionShardingState* _css; + }; + static ScopedCollectionShardingState assertCollectionLockedAndAcquire( + OperationContext* opCtx, const NamespaceString& nss); + static ScopedCollectionShardingState acquire(OperationContext* opCtx, + const NamespaceString& nss); /** * Returns the names of the collections that have a CollectionShardingState. @@ -78,14 +104,6 @@ public: static std::vector<NamespaceString> getCollectionNames(OperationContext* opCtx); /** - * Obtain a pointer to the CollectionShardingState that remains safe to access without holding - * a collection lock. Should be called instead of the regular get() if no collection lock is - * held. The returned CollectionShardingState instance should not be modified! - */ - static std::shared_ptr<CollectionShardingState> getSharedForLockFreeReads( - OperationContext* opCtx, const NamespaceString& nss); - - /** * Reports all collections which have filtering information associated. */ static void appendInfoForShardingStateCommand(OperationContext* opCtx, BSONObjBuilder* builder); @@ -186,7 +204,7 @@ public: virtual void join() = 0; /** - * Called by the CollectionShardingState::get method once per newly cached namespace. It is + * Called by the CollectionShardingState::acquire method once per newly cached namespace. It is * invoked under a mutex and must not acquire any locks or do blocking work. * * Implementations must be thread-safe when called from multiple threads. diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index ed9f90eaf88..7eb8f9cff2e 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -1273,7 +1273,9 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx, // operation to refresh the metadata. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss(), MODE_IX); - CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss(), CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); throw; } diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 9d0bc601411..b306ff6fc8d 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -31,6 +31,7 @@ #include "mongo/db/catalog/collection_uuid_mismatch.h" #include "mongo/db/db_raii.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" @@ -64,8 +65,9 @@ DropReply DropCollectionCoordinator::dropCollectionLocally(OperationContext* opC }(); // Clear CollectionShardingRuntime entry - auto* csr = CollectionShardingRuntime::get(opCtx, nss); - csr->clearFilteringMetadataForDroppedCollection(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadataForDroppedCollection(opCtx); } // Remove all range deletion task documents present on disk for the collection to drop. This is diff --git a/src/mongo/db/s/drop_collection_coordinator.h b/src/mongo/db/s/drop_collection_coordinator.h index 671dd979b61..765f11286f1 100644 --- a/src/mongo/db/s/drop_collection_coordinator.h +++ b/src/mongo/db/s/drop_collection_coordinator.h @@ -30,9 +30,9 @@ #pragma once #include "mongo/db/catalog/drop_collection.h" -#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/drop_collection_coordinator_document_gen.h" #include "mongo/db/s/sharding_ddl_coordinator.h" + namespace mongo { class DropCollectionCoordinator final diff --git a/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp b/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp index 7c49db1f8a4..225adb96b4b 100644 --- a/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp +++ b/src/mongo/db/s/flush_routing_table_cache_updates_command.cpp @@ -27,9 +27,6 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" @@ -50,7 +47,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - namespace mongo { namespace { @@ -128,9 +124,10 @@ public: // inclusive of the commit (and new writes to the committed chunk) that hasn't yet // propagated back to this shard. This ensures the read your own writes causal // consistency guarantee. - auto const csr = CollectionShardingRuntime::get(opCtx, ns()); - criticalSectionSignal = - csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, ns(), CSRAcquisitionMode::kShared); + criticalSectionSignal = scopedCsr->getCriticalSectionSignal( + opCtx, ShardingMigrationCriticalSection::kWrite); } if (criticalSectionSignal) diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp index e5a940af46b..db220d06ad0 100644 --- a/src/mongo/db/s/get_shard_version_command.cpp +++ b/src/mongo/db/s/get_shard_version_command.cpp @@ -98,9 +98,10 @@ public: nss, MODE_IS, AutoGetCollection::Options{}.viewMode(auto_get_collection::ViewMode::kViewsPermitted)); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); - const auto optMetadata = csr->getCurrentMetadataIfKnown(); + auto optMetadata = scopedCsr->getCurrentMetadataIfKnown(); if (!optMetadata) { result.append("global", "UNKNOWN"); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index f22cf09e623..9c15197b3be 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -202,11 +202,11 @@ void LogTransactionOperationsForShardingHandler::commit(OperationContext* opCtx, const auto& nss = stmt.getNss(); auto opCtx = cc().getOperationContext(); - auto csr = CollectionShardingRuntime::get(opCtx, nss); UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + auto scopedCss = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); - const auto clonerPtr = MigrationSourceManager::getCurrentCloner(csr, csrLock); + auto clonerPtr = MigrationSourceManager::getCurrentCloner(*scopedCss); if (!clonerPtr) { continue; } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 8bb1cde264d..f2d391d2c57 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -77,10 +77,10 @@ public: _autoColl->getCollection()); { - auto csr = CollectionShardingRuntime::get(opCtx, *nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, *nss, CSRAcquisitionMode::kShared); - if (auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock)) { + if (auto cloner = MigrationSourceManager::getCurrentCloner(*scopedCsr)) { _chunkCloner = std::dynamic_pointer_cast<MigrationChunkClonerSourceLegacy, MigrationChunkClonerSource>(cloner); invariant(_chunkCloner); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index d9e965e366f..b43c52d607d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" @@ -184,7 +182,8 @@ protected: AutoGetDb autoDb(operationContext(), kNss.dbName(), MODE_IX); Lock::CollectionLock collLock(operationContext(), kNss, MODE_IX); - CollectionShardingRuntime::get(operationContext(), kNss) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + operationContext(), kNss, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata( operationContext(), CollectionMetadata( diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index febea6e0fc8..a5c18af46ab 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -267,7 +267,8 @@ SharedSemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient auto waitForActiveQueriesToComplete = [&]() { AutoGetCollection autoColl(opCtx, deletionTask.getNss(), MODE_IS); - return CollectionShardingRuntime::get(opCtx, deletionTask.getNss()) + return CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, deletionTask.getNss(), CSRAcquisitionMode::kShared) ->getOngoingQueriesCompletionFuture(deletionTask.getCollectionUuid(), deletionTask.getRange()) .semi(); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 1209e173bca..455aa946da7 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -892,12 +892,11 @@ void MigrationDestinationManager::_dropLocalIndexesIfNecessary( const CollectionOptionsAndIndexes& collectionOptionsAndIndexes) { bool dropNonDonorIndexes = [&]() -> bool { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - auto* const css = CollectionShardingRuntime::get(opCtx, nss); - const auto optMetadata = css->getCurrentMetadataIfKnown(); - - // Only attempt to drop a collection's indexes if we have valid metadata and the - // collection is sharded. - if (optMetadata) { + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + // Only attempt to drop a collection's indexes if we have valid metadata and the collection + // is sharded + if (auto optMetadata = scopedCsr->getCurrentMetadataIfKnown()) { const auto& metadata = *optMetadata; if (metadata.isSharded()) { auto chunks = metadata.getChunks(); @@ -1895,7 +1894,9 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi if (refreshFailed) { AutoGetCollection autoColl(opCtx, _nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, _nss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); } // Release the critical section diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index fc9f046339a..9b5e41c8481 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -134,8 +134,9 @@ public: const auto collectionEpoch = [&] { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - auto const optMetadata = - CollectionShardingRuntime::get(opCtx, nss)->getCurrentMetadataIfKnown(); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + auto optMetadata = scopedCsr->getCurrentMetadataIfKnown(); uassert(StaleConfigInfo(nss, ShardVersion::IGNORED() /* receivedVersion */, boost::none /* wantedVersion */, diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 340e3c0a983..484e97abd2f 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -111,14 +111,13 @@ MONGO_FAIL_POINT_DEFINE(hangBeforePostMigrationCommitRefresh); } // namespace -MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime* csr, - CollectionShardingRuntime::CSRLock& csrLock) { +MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime& csr) { return msmForCsr(csr); } std::shared_ptr<MigrationChunkClonerSource> MigrationSourceManager::getCurrentCloner( - CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock) { - auto msm = get(csr, csrLock); + CollectionShardingRuntime& csr) { + auto msm = get(csr); if (!msm) return nullptr; return msm->_cloneDriver; @@ -182,9 +181,8 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, const auto [collectionMetadata, collectionUUID] = [&] { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, nss(), MODE_IS); - - auto* const csr = CollectionShardingRuntime::get(_opCtx, nss()); - const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss(), CSRAcquisitionMode::kExclusive); const auto metadata = checkCollectionIdentity(_opCtx, nss(), _args.getEpoch(), boost::none); @@ -198,7 +196,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, "Collection is undergoing changes so moveChunk is not allowed.", metadata.allowMigrations()); - _scopedRegisterer.emplace(this, csr, csrLock); + _scopedRegisterer.emplace(this, *scopedCsr); return std::make_tuple(std::move(metadata), std::move(collectionUUID)); }(); @@ -272,8 +270,8 @@ void MigrationSourceManager::startClone() { _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load()))); - auto* const csr = CollectionShardingRuntime::get(_opCtx, nss()); - const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, nss(), CSRAcquisitionMode::kExclusive); // Having the metadata manager registered on the collection sharding state is what indicates // that a chunk on that collection is being migrated to the OpObservers. With an active @@ -473,7 +471,9 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); - CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, nss(), CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(_opCtx); } scopedGuard.dismiss(); _cleanup(false); @@ -510,7 +510,9 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); - CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, nss(), CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(_opCtx); } scopedGuard.dismiss(); _cleanup(false); @@ -626,9 +628,10 @@ CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() { auto metadata = [&] { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, _args.getCommandParameter(), MODE_IS); - auto* const css = CollectionShardingRuntime::get(_opCtx, _args.getCommandParameter()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, _args.getCommandParameter(), CSRAcquisitionMode::kShared); - const auto optMetadata = css->getCurrentMetadataIfKnown(); + const auto optMetadata = scopedCsr->getCurrentMetadataIfKnown(); uassert(ErrorCodes::ConflictingOperationInProgress, "The collection's sharding state was cleared by a concurrent operation", optMetadata); @@ -653,8 +656,8 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { // Unregister from the collection's sharding state and exit the migration critical section. UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); - auto* const csr = CollectionShardingRuntime::get(_opCtx, nss()); - const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, nss(), CSRAcquisitionMode::kExclusive); if (_state != kCreated) { invariant(_cloneDriver); @@ -742,7 +745,9 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { // the next op to recover. UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); - CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, nss(), CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(_opCtx); } } @@ -757,10 +762,8 @@ BSONObj MigrationSourceManager::getMigrationStatusReport() const { _args.getMax().value_or(BSONObj())); } -MigrationSourceManager::ScopedRegisterer::ScopedRegisterer( - MigrationSourceManager* msm, - CollectionShardingRuntime* csr, - const CollectionShardingRuntime::CSRLock& csrLock) +MigrationSourceManager::ScopedRegisterer::ScopedRegisterer(MigrationSourceManager* msm, + CollectionShardingRuntime& csr) : _msm(msm) { invariant(nullptr == std::exchange(msmForCsr(csr), msm)); } @@ -768,9 +771,9 @@ MigrationSourceManager::ScopedRegisterer::ScopedRegisterer( MigrationSourceManager::ScopedRegisterer::~ScopedRegisterer() { UninterruptibleLockGuard noInterrupt(_msm->_opCtx->lockState()); AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getCommandParameter(), MODE_IX); - auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getCommandParameter()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_msm->_opCtx, csr); - invariant(_msm == std::exchange(msmForCsr(csr), nullptr)); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _msm->_opCtx, _msm->_args.getCommandParameter(), CSRAcquisitionMode::kExclusive); + invariant(_msm == std::exchange(msmForCsr(*scopedCsr), nullptr)); } } // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index b7819432c49..0f517fae185 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -77,17 +77,14 @@ public: * Retrieves the MigrationSourceManager pointer that corresponds to the given collection under * a CollectionShardingRuntime that has its ResourceMutex locked. */ - static MigrationSourceManager* get(CollectionShardingRuntime* csr, - CollectionShardingRuntime::CSRLock& csrLock); + static MigrationSourceManager* get(CollectionShardingRuntime& csr); /** * If the currently installed migration has reached the cloning stage (i.e., after startClone), * returns the cloner currently in use. - * - * Must be called with a both a collection lock and the CSRLock. */ static std::shared_ptr<MigrationChunkClonerSource> getCurrentCloner( - CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock); + CollectionShardingRuntime& csr); /** * Instantiates a new migration source manager with the specified migration parameters. Must be @@ -251,9 +248,7 @@ private: // sharding runtime for the collection class ScopedRegisterer { public: - ScopedRegisterer(MigrationSourceManager* msm, - CollectionShardingRuntime* csr, - const CollectionShardingRuntime::CSRLock& csrLock); + ScopedRegisterer(MigrationSourceManager* msm, CollectionShardingRuntime& csr); ~ScopedRegisterer(); private: diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 104d7e43f18..89b1a5d8f3a 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -425,9 +425,9 @@ ExecutorFuture<void> cleanUpRange(ServiceContext* serviceContext, AutoGetCollection autoColl( opCtx, NamespaceStringOrUUID{dbName, collectionUuid}, MODE_IS); optNss.emplace(autoColl.getNss()); - auto csr = CollectionShardingRuntime::get(opCtx, *optNss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto optCollDescr = csr->getCurrentMetadataIfKnown(); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, *optNss, CSRAcquisitionMode::kShared); + auto optCollDescr = scopedCsr->getCurrentMetadataIfKnown(); if (optCollDescr) { uassert(ErrorCodes:: @@ -449,7 +449,7 @@ ExecutorFuture<void> cleanUpRange(ServiceContext* serviceContext, ? CollectionShardingRuntime::kNow : CollectionShardingRuntime::kDelayed; - return csr->cleanUpRange(deletionTask.getRange(), whenToClean); + return scopedCsr->cleanUpRange(deletionTask.getRange(), whenToClean); } } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) { uasserted( @@ -950,7 +950,9 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { { AutoGetCollection autoColl(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); } asyncRecoverMigrationUntilSuccessOrStepDown(opCtx, nss); @@ -1024,14 +1026,14 @@ void recoverMigrationCoordinations(OperationContext* opCtx, auto setFilteringMetadata = [&opCtx, ¤tMetadata, &doc, &cancellationToken]() { AutoGetDb autoDb(opCtx, doc.getNss().dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, doc.getNss(), MODE_IX); - auto* const csr = CollectionShardingRuntime::get(opCtx, doc.getNss()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, doc.getNss(), CSRAcquisitionMode::kExclusive); - auto optMetadata = csr->getCurrentMetadataIfKnown(); + auto optMetadata = scopedCsr->getCurrentMetadataIfKnown(); invariant(!optMetadata); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); if (!cancellationToken.isCanceled()) { - csr->setFilteringMetadata_withLock(opCtx, std::move(currentMetadata), csrLock); + scopedCsr->setFilteringMetadata(opCtx, std::move(currentMetadata)); } }; diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index d023db60ab1..317992375d2 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -629,7 +629,7 @@ TEST_F( _mockCatalogClient->setCollections({coll}); auto metadata = makeShardedMetadata(opCtx, collectionUUID); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); // The task should have been submitted successfully. auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); @@ -665,7 +665,7 @@ TEST_F(SubmitRangeDeletionTaskTest, _mockCatalogClient->setCollections({matchingColl}); auto metadata = makeShardedMetadata(opCtx, collectionUUID); - csr().setFilteringMetadata(opCtx, metadata); + csr()->setFilteringMetadata(opCtx, metadata); // The task should have been submitted successfully. auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 4d156d39084..830076dbcfb 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -66,14 +66,7 @@ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, chunk.throwIfMoved(); } -bool isMigratingWithCSRLock(CollectionShardingRuntime* csr, - CollectionShardingRuntime::CSRLock& csrLock, - BSONObj const& docToDelete) { - auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); - return cloner && cloner->isDocumentInMigratingChunk(docToDelete); -} - -void assertNoMovePrimaryInProgress(OperationContext* opCtx, NamespaceString const& nss) { +void assertNoMovePrimaryInProgress(OperationContext* opCtx, const NamespaceString& nss) { if (!nss.isNormalCollection() && nss.coll() != "system.views" && !nss.isTimeseriesBucketsCollection()) { return; @@ -101,9 +94,11 @@ OpObserverShardingImpl::OpObserverShardingImpl(std::unique_ptr<OplogWriter> oplo bool OpObserverShardingImpl::isMigrating(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& docToDelete) { - auto csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - return isMigratingWithCSRLock(csr, csrLock, docToDelete); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + auto cloner = MigrationSourceManager::getCurrentCloner(*scopedCsr); + + return cloner && cloner->isDocumentInMigratingChunk(docToDelete); } void OpObserverShardingImpl::shardObserveAboutToDelete(OperationContext* opCtx, @@ -123,9 +118,9 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, return; auto* const css = shardingWriteRouter.getCss(); - auto* const csr = CollectionShardingRuntime::get(css); - csr->checkShardVersionOrThrow(opCtx); + css->checkShardVersionOrThrow(opCtx); + auto* const csr = checked_cast<CollectionShardingRuntime*>(css); auto metadata = csr->getCurrentMetadataIfKnown(); if (!metadata || !metadata->isSharded()) { assertNoMovePrimaryInProgress(opCtx, nss); @@ -144,8 +139,7 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, return; } - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); + auto cloner = MigrationSourceManager::getCurrentCloner(*csr); if (cloner) { cloner->onInsertOp(opCtx, insertedDoc, opTime); } @@ -160,9 +154,9 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, const repl::OpTime& prePostImageOpTime, const bool inMultiDocumentTransaction) { auto* const css = shardingWriteRouter.getCss(); - auto* const csr = CollectionShardingRuntime::get(css); - csr->checkShardVersionOrThrow(opCtx); + css->checkShardVersionOrThrow(opCtx); + auto* const csr = checked_cast<CollectionShardingRuntime*>(css); auto metadata = csr->getCurrentMetadataIfKnown(); if (!metadata || !metadata->isSharded()) { assertNoMovePrimaryInProgress(opCtx, nss); @@ -181,8 +175,7 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, return; } - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); + auto cloner = MigrationSourceManager::getCurrentCloner(*csr); if (cloner) { cloner->onUpdateOp(opCtx, preImageDoc, postImageDoc, opTime, prePostImageOpTime); } @@ -196,9 +189,9 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, const repl::OpTime& preImageOpTime, const bool inMultiDocumentTransaction) { auto* const css = shardingWriteRouter.getCss(); - auto* const csr = CollectionShardingRuntime::get(css); - csr->checkShardVersionOrThrow(opCtx); + css->checkShardVersionOrThrow(opCtx); + auto* const csr = checked_cast<CollectionShardingRuntime*>(css); auto metadata = csr->getCurrentMetadataIfKnown(); if (!metadata || !metadata->isSharded()) { assertNoMovePrimaryInProgress(opCtx, nss); @@ -217,9 +210,7 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, return; } - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); - + auto cloner = MigrationSourceManager::getCurrentCloner(*csr); if (cloner && getIsMigrating(opCtx)) { cloner->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime); } diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h index 2c18fccc416..4e38292e80a 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.h +++ b/src/mongo/db/s/op_observer_sharding_impl.h @@ -33,8 +33,6 @@ namespace mongo { -class ShardingWriteRouter; - class OpObserverShardingImpl : public OpObserverImpl { public: OpObserverShardingImpl(std::unique_ptr<OplogWriter> oplogWriter); diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp index 85de326ecdf..6e2761e3e04 100644 --- a/src/mongo/db/s/op_observer_sharding_test.cpp +++ b/src/mongo/db/s/op_observer_sharding_test.cpp @@ -42,7 +42,8 @@ const NamespaceString kTestNss("TestDB", "TestColl"); void setCollectionFilteringMetadata(OperationContext* opCtx, CollectionMetadata metadata) { AutoGetCollection autoColl(opCtx, kTestNss, MODE_X); - CollectionShardingRuntime::get(opCtx, kTestNss) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, kTestNss, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata(opCtx, std::move(metadata)); } diff --git a/src/mongo/db/s/persistent_task_queue_test.cpp b/src/mongo/db/s/persistent_task_queue_test.cpp index 06e9ad5514d..d12562864c3 100644 --- a/src/mongo/db/s/persistent_task_queue_test.cpp +++ b/src/mongo/db/s/persistent_task_queue_test.cpp @@ -85,7 +85,8 @@ class PersistentTaskQueueTest : public ShardServerTestFixture { ShardServerTestFixture::setUp(); AutoGetDb autoDb(operationContext(), kNss.dbName(), MODE_IX); Lock::CollectionLock collLock(operationContext(), kNss, MODE_IX); - CollectionShardingRuntime::get(operationContext(), kNss) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + operationContext(), kNss, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata(operationContext(), CollectionMetadata()); } }; diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index 267905c0632..b3b56b43823 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/s/range_deleter_service.h" + #include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer/op_observer_registry.h" @@ -58,7 +59,8 @@ BSONObj getShardKeyPattern(OperationContext* opCtx, AutoGetCollection collection( opCtx, NamespaceStringOrUUID{dbName.toString(), collectionUuid}, MODE_IS); - auto optMetadata = CollectionShardingRuntime::get(opCtx, collection.getNss()) + auto optMetadata = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, collection.getNss(), CSRAcquisitionMode::kShared) ->getCurrentMetadataIfKnown(); if (optMetadata && optMetadata->isSharded()) { return optMetadata->getShardKeyPattern().toBSON(); diff --git a/src/mongo/db/s/range_deleter_service_op_observer.cpp b/src/mongo/db/s/range_deleter_service_op_observer.cpp index 1ca0fb3d02d..8765dfedbbb 100644 --- a/src/mongo/db/s/range_deleter_service_op_observer.cpp +++ b/src/mongo/db/s/range_deleter_service_op_observer.cpp @@ -47,7 +47,8 @@ void registerTaskWithOngoingQueriesOnOpLogEntryCommit(OperationContext* opCtx, try { AutoGetCollection autoColl(opCtx, rdt.getNss(), MODE_IS); auto waitForActiveQueriesToComplete = - CollectionShardingRuntime::get(opCtx, rdt.getNss()) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, rdt.getNss(), CSRAcquisitionMode::kShared) ->getOngoingQueriesCompletionFuture(rdt.getCollectionUuid(), rdt.getRange()) .semi(); (void)RangeDeleterService::get(opCtx)->registerTask( diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index d2ca4ffa937..d2406120483 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -116,11 +116,9 @@ void RangeDeleterServiceTest::_setFilteringMetadataByUUID(OperationContext* opCt }(); AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_X); - - CollectionShardingRuntime::get(opCtx, nss)->setFilteringMetadata(opCtx, metadata); - auto* css = CollectionShardingState::get(opCtx, nss); - auto& csr = *checked_cast<CollectionShardingRuntime*>(css); - csr.setFilteringMetadata(opCtx, metadata); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->setFilteringMetadata(opCtx, metadata); } /** diff --git a/src/mongo/db/s/range_deleter_service_test_util.cpp b/src/mongo/db/s/range_deleter_service_test_util.cpp index fde50854c74..68c82e879b3 100644 --- a/src/mongo/db/s/range_deleter_service_test_util.cpp +++ b/src/mongo/db/s/range_deleter_service_test_util.cpp @@ -184,7 +184,9 @@ void _clearFilteringMetadataByUUID(OperationContext* opCtx, const UUID& uuid) { NamespaceString nss = RangeDeleterServiceTest::nssWithUuid[uuid]; AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_X); - CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); } } // namespace mongo diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index 5f8cafc91e0..625f0dcafc3 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" @@ -118,8 +119,10 @@ public: boost::none); AutoGetDb autoDb(_opCtx, kNss.dbName(), MODE_IX); Lock::CollectionLock collLock(_opCtx, kNss, MODE_IX); - CollectionMetadata collMetadata(std::move(cm), ShardId("dummyShardId")); - CollectionShardingRuntime::get(_opCtx, kNss)->setFilteringMetadata(_opCtx, collMetadata); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + _opCtx, kNss, CSRAcquisitionMode::kExclusive) + ->setFilteringMetadata(_opCtx, + CollectionMetadata(std::move(cm), ShardId("dummyShardId"))); } UUID uuid() const { diff --git a/src/mongo/db/s/rename_collection_participant_service.cpp b/src/mongo/db/s/rename_collection_participant_service.cpp index 910505e5cf9..b907a3e7f26 100644 --- a/src/mongo/db/s/rename_collection_participant_service.cpp +++ b/src/mongo/db/s/rename_collection_participant_service.cpp @@ -76,8 +76,9 @@ void clearFilteringMetadata(OperationContext* opCtx, const NamespaceString& nss) UninterruptibleLockGuard noInterrupt(opCtx->lockState()); Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - auto* csr = CollectionShardingRuntime::get(opCtx, nss); - csr->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); } /* @@ -323,15 +324,17 @@ SemiFuture<void> RenameParticipantInstance::_runImpl( { Lock::DBLock dbLock(opCtx, fromNss().dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, fromNss(), MODE_IX); - auto* csr = CollectionShardingRuntime::get(opCtx, fromNss()); - csr->clearFilteringMetadataForDroppedCollection(opCtx); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, fromNss(), CSRAcquisitionMode::kExclusive); + scopedCsr->clearFilteringMetadataForDroppedCollection(opCtx); } { Lock::DBLock dbLock(opCtx, toNss().dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, toNss(), MODE_IX); - auto* csr = CollectionShardingRuntime::get(opCtx, toNss()); - csr->clearFilteringMetadata(opCtx); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, toNss(), CSRAcquisitionMode::kExclusive); + scopedCsr->clearFilteringMetadata(opCtx); } snapshotRangeDeletionsForRename(opCtx, fromNss(), toNss()); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 3070482a434..f328f0b83d6 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -353,7 +353,9 @@ void clearFilteringMetadata(OperationContext* opCtx, } AutoGetCollection autoColl(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); if (!scheduleAsyncRefresh) { continue; diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index 831144c5511..627577748fa 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -275,9 +275,9 @@ protected: boost::optional<CollectionIndexes>(boost::none)) /* shardVersion */, boost::none /* databaseVersion */}; - auto csr = CollectionShardingRuntime::get(opCtx, sourceNss); - csr->setFilteringMetadata(opCtx, metadata); - ASSERT(csr->getCurrentMetadataIfKnown()); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, sourceNss, CSRAcquisitionMode::kExclusive) + ->setFilteringMetadata(opCtx, metadata); } private: @@ -584,7 +584,8 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta // Assert the prestate has no filtering metadata. for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); } @@ -600,7 +601,8 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); ASSERT(csr->getCurrentMetadataIfKnown()); } }; @@ -619,7 +621,8 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); } @@ -633,7 +636,8 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta for (auto const& nss : {kOriginalNss, kTemporaryReshardingNss}) { AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); } } @@ -670,14 +674,16 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta for (auto const& nss : {sourceNss1, tempReshardingNss1}) { AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); ASSERT(csr->getCurrentMetadataIfKnown() == boost::none); } // Assert that the filtering metadata is not cleared for other operation for (auto const& nss : {sourceNss2, tempReshardingNss2}) { AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); ASSERT(csr->getCurrentMetadataIfKnown() != boost::none); } } diff --git a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp index 8291dd8a654..575f1d58628 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_helpers.cpp @@ -27,8 +27,8 @@ * it in the license file. */ - #include "mongo/db/s/resharding/resharding_metrics_helpers.h" + #include "mongo/db/catalog_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" @@ -36,10 +36,8 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding - namespace mongo { namespace resharding_metrics { - namespace { boost::optional<UUID> tryGetReshardingUUID(OperationContext* opCtx, const NamespaceString& nss) { @@ -54,8 +52,9 @@ boost::optional<UUID> tryGetReshardingUUID(OperationContext* opCtx, const Namesp // so this is considered acceptable. AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, nss); - auto metadata = csr->getCurrentMetadataIfKnown(); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + auto metadata = scopedCsr->getCurrentMetadataIfKnown(); if (!metadata || !metadata->isSharded()) { return boost::none; } @@ -105,5 +104,4 @@ void onCriticalSectionError(OperationContext* opCtx, const StaleConfigInfo& info } } // namespace resharding_metrics - } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp index 675cbf9206c..c34355c2d17 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp @@ -71,7 +71,9 @@ void assertCanExtractShardKeyFromDocs(OperationContext* opCtx, const NamespaceString& nss, std::vector<InsertStatement>::const_iterator begin, std::vector<InsertStatement>::const_iterator end) { - const auto collDesc = CollectionShardingState::get(opCtx, nss)->getCollectionDescription(opCtx); + auto collDesc = CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss) + ->getCollectionDescription(opCtx); + // A user can manually create a 'db.system.resharding.' collection that isn't guaranteed to be // sharded outside of running reshardCollection. uassert(ErrorCodes::NamespaceNotSharded, diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index 94ebf77e94b..1f542725ddb 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -110,7 +110,8 @@ public: { AutoGetCollection autoColl(opCtx.get(), _outputNss, MODE_X); - CollectionShardingRuntime::get(opCtx.get(), _outputNss) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx.get(), _outputNss, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata( opCtx.get(), CollectionMetadata(makeChunkManagerForOutputCollection(), _myDonorId)); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp index 9c718a2290b..4e86f9c1ce6 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp @@ -43,11 +43,12 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding namespace mongo { - namespace { + const WriteConcernOptions kMajorityWriteConcern{ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; -} + +} // namespace void ReshardingRecipientService::RecipientStateMachineExternalState:: ensureTempReshardingCollectionExistsWithIndexes(OperationContext* opCtx, @@ -88,7 +89,8 @@ void ReshardingRecipientService::RecipientStateMachineExternalState:: std::move(collOptions)}); AutoGetCollection autoColl(opCtx, metadata.getTempReshardingNss(), MODE_IX); - CollectionShardingRuntime::get(opCtx, metadata.getTempReshardingNss()) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, metadata.getTempReshardingNss(), CSRAcquisitionMode::kExclusive) ->clearFilteringMetadata(opCtx); } diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 3ca79f949da..ad107728b3c 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -283,7 +283,8 @@ public: void setUnshardedFilteringMetadata(const NamespaceString& nss) { AutoGetDb autoDb(operationContext(), nss.dbName(), MODE_IX); Lock::CollectionLock collLock(operationContext(), nss, MODE_IX); - CollectionShardingRuntime::get(operationContext(), nss) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + operationContext(), nss, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata(operationContext(), CollectionMetadata()); } diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 3bff53c1488..e4aa88199fb 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -302,17 +302,17 @@ void onDbVersionMismatch(OperationContext* opCtx, */ bool joinCollectionPlacementVersionOperation( OperationContext* opCtx, - CollectionShardingRuntime* csr, boost::optional<Lock::DBLock>* dbLock, boost::optional<Lock::CollectionLock>* collLock, - boost::optional<CollectionShardingRuntime::CSRLock>* csrLock) { + boost::optional<CollectionShardingRuntime::ScopedCollectionShardingRuntime>* scopedCsr) { invariant(dbLock->has_value()); invariant(collLock->has_value()); - invariant(csrLock->has_value()); + invariant(scopedCsr->has_value()); if (auto critSecSignal = - csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)) { - csrLock->reset(); + (**scopedCsr) + ->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)) { + scopedCsr->reset(); collLock->reset(); dbLock->reset(); @@ -322,15 +322,16 @@ bool joinCollectionPlacementVersionOperation( return true; } - if (auto inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx)) { - csrLock->reset(); + if (auto inRecoverOrRefresh = (**scopedCsr)->getShardVersionRecoverRefreshFuture(opCtx)) { + scopedCsr->reset(); collLock->reset(); dbLock->reset(); try { inRecoverOrRefresh->get(opCtx); } catch (const ExceptionFor<ErrorCodes::ShardVersionRefreshCanceled>&) { - // The ongoing refresh has finished, although it was interrupted. + // The ongoing refresh has finished, although it was canceled by a + // 'clearFilteringMetadata'. } return true; @@ -372,17 +373,17 @@ SharedSemiFuture<void> recoverRefreshCollectionPlacementVersion( Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); // cancellationToken needs to be checked under the CSR lock before overwriting the // filtering metadata to serialize with other threads calling // 'clearFilteringMetadata' if (currentMetadataToInstall && !cancellationToken.isCanceled()) { - csr->setFilteringMetadata_withLock(opCtx, *currentMetadataToInstall, csrLock); + scopedCsr->setFilteringMetadata(opCtx, *currentMetadataToInstall); } - csr->resetShardVersionRecoverRefreshFuture(csrLock); + scopedCsr->resetShardVersionRecoverRefreshFuture(); }); if (runRecover) { @@ -400,12 +401,14 @@ SharedSemiFuture<void> recoverRefreshCollectionPlacementVersion( if (!currentMetadata.allowMigrations()) { boost::optional<SharedSemiFuture<void>> waitForMigrationAbort; { - Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); - Lock::CollectionLock collLock(opCtx, nss, MODE_IX); + Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IS); + Lock::CollectionLock collLock(opCtx, nss, MODE_IS); - auto const& csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (auto msm = MigrationSourceManager::get(csr, csrLock)) { + auto scopedCsr = + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + + if (auto msm = MigrationSourceManager::get(*scopedCsr)) { waitForMigrationAbort.emplace(msm->abort()); } } @@ -478,18 +481,17 @@ void onCollectionPlacementVersionMismatch(OperationContext* opCtx, dbLock.emplace(opCtx, nss.dbName(), MODE_IS); collLock.emplace(opCtx, nss, MODE_IS); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - if (chunkVersionReceived) { - boost::optional<CollectionShardingRuntime::CSRLock> csrLock = - CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + boost::optional<CollectionShardingRuntime::ScopedCollectionShardingRuntime> + scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); if (joinCollectionPlacementVersionOperation( - opCtx, csr, &dbLock, &collLock, &csrLock)) { + opCtx, &dbLock, &collLock, &scopedCsr)) { continue; } - if (auto metadata = csr->getCurrentMetadataIfKnown()) { + if (auto metadata = (*scopedCsr)->getCurrentMetadataIfKnown()) { const auto currentCollectionPlacementVersion = metadata->getShardVersion(); // Don't need to remotely reload if the requested version is smaller than the // known one. This means that the remote side is behind. @@ -500,10 +502,11 @@ void onCollectionPlacementVersionMismatch(OperationContext* opCtx, } } - boost::optional<CollectionShardingRuntime::CSRLock> csrLock = - CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + boost::optional<CollectionShardingRuntime::ScopedCollectionShardingRuntime> scopedCsr = + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); - if (joinCollectionPlacementVersionOperation(opCtx, csr, &dbLock, &collLock, &csrLock)) { + if (joinCollectionPlacementVersionOperation(opCtx, &dbLock, &collLock, &scopedCsr)) { continue; } @@ -511,15 +514,15 @@ void onCollectionPlacementVersionMismatch(OperationContext* opCtx, // and we are holding the exclusive CSR lock. // If the shard doesn't yet know its filtering metadata, recovery needs to be run - const bool runRecover = csr->getCurrentMetadataIfKnown() ? false : true; + const bool runRecover = (*scopedCsr)->getCurrentMetadataIfKnown() ? false : true; CancellationSource cancellationSource; CancellationToken cancellationToken = cancellationSource.token(); - csr->setShardVersionRecoverRefreshFuture( - recoverRefreshCollectionPlacementVersion( - opCtx->getServiceContext(), nss, runRecover, std::move(cancellationToken)), - std::move(cancellationSource), - *csrLock); - inRecoverOrRefresh = csr->getShardVersionRecoverRefreshFuture(opCtx); + (*scopedCsr) + ->setShardVersionRecoverRefreshFuture( + recoverRefreshCollectionPlacementVersion( + opCtx->getServiceContext(), nss, runRecover, std::move(cancellationToken)), + std::move(cancellationSource)); + inRecoverOrRefresh = (*scopedCsr)->getShardVersionRecoverRefreshFuture(opCtx); } try { @@ -601,8 +604,9 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, // is in the 'system.views' collection. Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss) - ->setFilteringMetadata(opCtx, CollectionMetadata()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); + scopedCsr->setFilteringMetadata(opCtx, CollectionMetadata()); return ChunkVersion::UNSHARDED(); } @@ -615,23 +619,18 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, // is in the 'system.views' collection. Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); - auto optMetadata = CollectionShardingRuntime::get(opCtx, nss)->getCurrentMetadataIfKnown(); - - // We already have newer version - if (optMetadata) { + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + if (auto optMetadata = scopedCsr->getCurrentMetadataIfKnown()) { const auto& metadata = *optMetadata; if (metadata.isSharded() && (cm.getVersion().isOlderOrEqualThan(metadata.getCollVersion()))) { - LOGV2_DEBUG( - 22063, - 1, - "Skipping refresh of metadata for {namespace} {latestCollectionVersion} with " - "an older {refreshedCollectionVersion}", - "Skipping metadata refresh because collection already has at least as recent " - "metadata", - "namespace"_attr = nss, - "latestCollectionVersion"_attr = metadata.getCollVersion(), - "refreshedCollectionVersion"_attr = cm.getVersion()); + LOGV2_DEBUG(22063, + 1, + "Skipping metadata refresh because collection already is up-to-date", + "namespace"_attr = nss, + "latestCollectionVersion"_attr = metadata.getCollVersion(), + "refreshedCollectionVersion"_attr = cm.getVersion()); return metadata.getShardVersion(); } } @@ -644,35 +643,26 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, // 'system.views' collection. Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - - { - auto optMetadata = csr->getCurrentMetadataIfKnown(); - - // We already have newer version - if (optMetadata) { - const auto& metadata = *optMetadata; - if (metadata.isSharded() && - (cm.getVersion().isOlderOrEqualThan(metadata.getCollVersion()))) { - LOGV2_DEBUG( - 22064, - 1, - "Skipping refresh of metadata for {namespace} {latestCollectionVersion} with " - "an older {refreshedCollectionVersion}", - "Skipping metadata refresh because collection already has at least as recent " - "metadata", - "namespace"_attr = nss, - "latestCollectionVersion"_attr = metadata.getCollVersion(), - "refreshedCollectionVersion"_attr = cm.getVersion()); - return metadata.getShardVersion(); - } + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); + if (auto optMetadata = scopedCsr->getCurrentMetadataIfKnown()) { + const auto& metadata = *optMetadata; + if (metadata.isSharded() && + (cm.getVersion().isOlderOrEqualThan(metadata.getCollVersion()))) { + LOGV2_DEBUG(22064, + 1, + "Skipping metadata refresh because collection already is up-to-date", + "namespace"_attr = nss, + "latestCollectionVersion"_attr = metadata.getCollVersion(), + "refreshedCollectionVersion"_attr = cm.getVersion()); + return metadata.getShardVersion(); } } CollectionMetadata metadata(cm, shardingState->shardId()); - const auto newShardVersion = metadata.getShardVersion(); + auto newShardVersion = metadata.getShardVersion(); - csr->setFilteringMetadata(opCtx, std::move(metadata)); + scopedCsr->setFilteringMetadata(opCtx, std::move(metadata)); return newShardVersion; } diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 0278b1487ca..0df49f52f89 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -27,19 +27,17 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/db/s/shard_server_op_observer.h" #include "mongo/bson/util/bson_extract.h" -#include "mongo/db/catalog/database_holder_impl.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/op_observer/op_observer_impl.h" #include "mongo/db/s/balancer_stats_registry.h" #include "mongo/db/s/chunk_split_state_driver.h" #include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/collection_critical_section_document_gen.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/global_index_ddl_util.h" #include "mongo/db/s/migration_source_manager.h" @@ -57,13 +55,11 @@ #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/cannot_implicitly_create_collection_info.h" -#include "mongo/s/catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/sharding_feature_flags_gen.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - namespace mongo { namespace { @@ -95,11 +91,12 @@ public: // Force subsequent uses of the namespace to refresh the filtering metadata so they can // synchronize with any work happening on the primary (e.g., migration critical section). UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto scopedCss = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, _nss, CSRAcquisitionMode::kExclusive); if (_droppingCollection) - CollectionShardingRuntime::get(opCtx, _nss) - ->clearFilteringMetadataForDroppedCollection(opCtx); + scopedCss->clearFilteringMetadataForDroppedCollection(opCtx); else - CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); + scopedCss->clearFilteringMetadata(opCtx); } void rollback(OperationContext* opCtx) override {} @@ -220,10 +217,10 @@ void incrementChunkOnInsertOrUpdate(OperationContext* opCtx, * Aborts any ongoing migration for the given namespace. Should only be called when observing * index operations. */ -void abortOngoingMigrationIfNeeded(OperationContext* opCtx, const NamespaceString nss) { - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (auto msm = MigrationSourceManager::get(csr, csrLock)) { +void abortOngoingMigrationIfNeeded(OperationContext* opCtx, const NamespaceString& nss) { + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared); + if (auto msm = MigrationSourceManager::get(*scopedCsr)) { // Only interrupt the migration, but don't actually join (void)msm->abort(); } @@ -241,7 +238,9 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { const auto& nss = coll->ns(); - const auto metadata = CollectionShardingRuntime::get(opCtx, nss)->getCurrentMetadataIfKnown(); + auto metadata = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared) + ->getCurrentMetadataIfKnown(); for (auto it = begin; it != end; ++it) { const auto& insertedDoc = it->doc; @@ -307,11 +306,12 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, } UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss); - auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCatchUpPhase(csrLock, reason); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, insertedNss, CSRAcquisitionMode::kExclusive); + scopedCsr->enterCriticalSectionCatchUpPhase(reason); }); } + if (metadata && metadata->isSharded()) { incrementChunkOnInsertOrUpdate(opCtx, nss, @@ -377,7 +377,9 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE // Force subsequent uses of the namespace to refresh the filtering metadata so they // can synchronize with any work happening on the primary (e.g., migration critical // section). - CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, updatedNss, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); } } @@ -412,11 +414,11 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE DatabaseName dbName(boost::none, db); AutoGetDb autoDb(opCtx, dbName, MODE_X); - DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, dbName); auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( opCtx, dbName, DSSAcquisitionMode::kExclusive); scopedDss->cancelDbMetadataRefresh(); + DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, dbName); } } @@ -459,13 +461,15 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); - auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCommitPhase(csrLock, reason); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, updatedNss, CSRAcquisitionMode::kExclusive) + ->enterCriticalSectionCommitPhase(reason); }); } - auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss); - const auto metadata = csr->getCurrentMetadataIfKnown(); + + auto metadata = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, args.nss, CSRAcquisitionMode::kShared) + ->getCurrentMetadataIfKnown(); if (metadata && metadata->isSharded()) { incrementChunkOnInsertOrUpdate(opCtx, args.nss, @@ -505,14 +509,18 @@ void ShardServerOpObserver::onModifyShardedCollectionGlobalIndexCatalogEntry( auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp(); opCtx->recoveryUnit()->onCommit([opCtx, nss, indexVersion, indexEntry](auto _) { AutoGetCollection autoColl(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss)->addIndex(opCtx, indexEntry, indexVersion); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->addIndex(opCtx, indexEntry, indexVersion); }); } else { auto indexName = indexDoc["entry"][IndexCatalogType::kNameFieldName].str(); auto indexVersion = indexDoc["entry"][IndexCatalogType::kLastmodFieldName].timestamp(); opCtx->recoveryUnit()->onCommit([opCtx, nss, indexName, indexVersion](auto _) { AutoGetCollection autoColl(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss)->removeIndex(opCtx, indexName, indexVersion); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->removeIndex(opCtx, indexName, indexVersion); }); } } @@ -547,11 +555,11 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, DatabaseName dbName(boost::none, deletedDatabase); AutoGetDb autoDb(opCtx, dbName, MODE_X); - DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, dbName); auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( opCtx, dbName, DSSAcquisitionMode::kExclusive); scopedDss->cancelDbMetadataRefresh(); + DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, dbName); } if (nss == NamespaceString::kServerConfigurationNamespace) { @@ -591,15 +599,15 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, } UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, deletedNss, CSRAcquisitionMode::kExclusive); // Secondary nodes must clear the filtering metadata before releasing the // in-memory critical section if (!isStandaloneOrPrimary(opCtx)) - csr->clearFilteringMetadata(opCtx); + scopedCsr->clearFilteringMetadata(opCtx); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock, reason); + scopedCsr->exitCriticalSection(reason); }); } @@ -651,7 +659,8 @@ void ShardServerOpObserver::onCreateCollection(OperationContext* opCtx, // Temp collections are always UNSHARDED if (options.temp) { - CollectionShardingRuntime::get(opCtx, collectionName) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, collectionName, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata(opCtx, CollectionMetadata()); return; } @@ -664,9 +673,10 @@ void ShardServerOpObserver::onCreateCollection(OperationContext* opCtx, // If the check above passes, this means the collection doesn't exist and is being created and // that the caller will be responsible to eventially set the proper shard version - auto* const csr = CollectionShardingRuntime::get(opCtx, collectionName); - if (!csr->getCurrentMetadataIfKnown()) { - csr->setFilteringMetadata(opCtx, CollectionMetadata()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, collectionName, CSRAcquisitionMode::kExclusive); + if (!scopedCsr->getCurrentMetadataIfKnown()) { + scopedCsr->setFilteringMetadata(opCtx, CollectionMetadata()); } } diff --git a/src/mongo/db/s/sharding_recovery_service.cpp b/src/mongo/db/s/sharding_recovery_service.cpp index 94d3349365d..6c75fa289ca 100644 --- a/src/mongo/db/s/sharding_recovery_service.cpp +++ b/src/mongo/db/s/sharding_recovery_service.cpp @@ -432,9 +432,9 @@ void ShardingRecoveryService::recoverRecoverableCriticalSections(OperationContex for (const auto& collName : collectionNames) { try { AutoGetCollection collLock(opCtx, collName, MODE_X); - auto* const csr = CollectionShardingRuntime::get(opCtx, collName); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSectionNoChecks(csrLock); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, collName, CSRAcquisitionMode::kExclusive) + ->exitCriticalSectionNoChecks(); } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { LOGV2_DEBUG(6050800, 2, @@ -451,11 +451,12 @@ void ShardingRecoveryService::recoverRecoverableCriticalSections(OperationContex const auto& nss = doc.getNss(); { AutoGetCollection collLock(opCtx, nss, MODE_X); - auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCatchUpPhase(csrLock, doc.getReason()); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive); + + scopedCsr->enterCriticalSectionCatchUpPhase(doc.getReason()); if (doc.getBlockReads()) - csr->enterCriticalSectionCommitPhase(csrLock, doc.getReason()); + scopedCsr->enterCriticalSectionCommitPhase(doc.getReason()); return true; } @@ -487,8 +488,9 @@ void ShardingRecoveryService::recoverIndexesCatalog(OperationContext* opCtx) { for (const auto& collName : collectionNames) { try { AutoGetCollection collLock(opCtx, collName, MODE_X); - auto* const csr = CollectionShardingRuntime::get(opCtx, collName); - csr->clearIndexes(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, collName, CSRAcquisitionMode::kExclusive) + ->clearIndexes(opCtx); } catch (const ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { LOGV2_DEBUG(6686501, 2, @@ -514,7 +516,9 @@ void ShardingRecoveryService::recoverIndexesCatalog(OperationContext* opCtx) { auto indexEntry = IndexCatalogType::parse( IDLParserContext("recoverIndexesCatalogContext"), idx.Obj()); AutoGetCollection collLock(opCtx, nss, MODE_X); - CollectionShardingRuntime::get(opCtx, nss)->addIndex(opCtx, indexEntry, indexVersion); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, collLock->ns(), CSRAcquisitionMode::kExclusive) + ->addIndex(opCtx, indexEntry, indexVersion); } } LOGV2_DEBUG(6686502, 2, "Recovered all index versions"); diff --git a/src/mongo/db/s/sharding_state_lock.h b/src/mongo/db/s/sharding_state_lock.h deleted file mode 100644 index 07fa1d8a430..00000000000 --- a/src/mongo/db/s/sharding_state_lock.h +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/stdx/variant.h" - -namespace mongo { - -/** - * RAII-style class that locks a sharding state object using the state object's ResourceMutex. The - * lock will be created and acquired on construction. The lock will be dismissed upon destruction - * of the sharding state object. - */ -template <class ShardingState> -class ShardingStateLock { -public: - /** - * Locks the sharding state object with the sharding state object's ResourceMutex in MODE_IS. - * When the object goes out of scope, the ResourceMutex will be unlocked. - */ - static ShardingStateLock lockShared(OperationContext* opCtx, ShardingState* state); - - /** - * Follows the same functionality as the ShardingStateLock lock method, except that - * lockExclusive takes the ResourceMutex in MODE_X. - */ - static ShardingStateLock lockExclusive(OperationContext* opCtx, ShardingState* state); - -private: - using StateLock = stdx::variant<Lock::SharedLock, Lock::ExclusiveLock>; - - ShardingStateLock(OperationContext* opCtx, ShardingState* state, LockMode lockMode); - - // The lock created and locked upon construction of a ShardingStateLock object. It locks the - // ResourceMutex taken from the ShardingState class, passed in on construction. - StateLock _lock; -}; - -template <class ShardingState> -ShardingStateLock<ShardingState>::ShardingStateLock(OperationContext* opCtx, - ShardingState* state, - LockMode lockMode) - : _lock([&]() -> StateLock { - invariant(lockMode == MODE_IS || lockMode == MODE_X); - return ( - lockMode == MODE_IS - ? StateLock(Lock::SharedLock(opCtx->lockState(), state->_stateChangeMutex)) - : StateLock(Lock::ExclusiveLock(opCtx->lockState(), state->_stateChangeMutex))); - }()) {} - -template <class ShardingState> -ShardingStateLock<ShardingState> ShardingStateLock<ShardingState>::lockShared( - OperationContext* opCtx, ShardingState* state) { - return ShardingStateLock(opCtx, state, MODE_IS); -} - -template <class ShardingState> -ShardingStateLock<ShardingState> ShardingStateLock<ShardingState>::lockExclusive( - OperationContext* opCtx, ShardingState* state) { - return ShardingStateLock(opCtx, state, MODE_X); -} - -} // namespace mongo diff --git a/src/mongo/db/s/sharding_write_router.cpp b/src/mongo/db/s/sharding_write_router.cpp index bb372f85572..514a2344951 100644 --- a/src/mongo/db/s/sharding_write_router.cpp +++ b/src/mongo/db/s/sharding_write_router.cpp @@ -33,47 +33,48 @@ namespace mongo { ShardingWriteRouter::ShardingWriteRouter(OperationContext* opCtx, const NamespaceString& nss, - CatalogCache* catalogCache) { - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - _css = CollectionShardingState::get(opCtx, nss); - auto collDesc = _css->getCollectionDescription(opCtx); + CatalogCache* catalogCache) + : _scopedCss(CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, nss)), + _collDesc(_scopedCss->getCollectionDescription(opCtx)) { + if (!_collDesc.isSharded()) { + invariant(!_collDesc.getReshardingKeyIfShouldForwardOps()); + return; + } - _reshardKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); - if (_reshardKeyPattern) { - _ownershipFilter = _css->getOwnershipFilter( - opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); - _shardKeyPattern = ShardKeyPattern(collDesc.getKeyPattern()); + _reshardingKeyPattern = _collDesc.getReshardingKeyIfShouldForwardOps(); + if (_reshardingKeyPattern) { + _ownershipFilter = _scopedCss->getOwnershipFilter( + opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); - const auto& reshardingFields = collDesc.getReshardingFields(); - invariant(reshardingFields); - const auto& donorFields = reshardingFields->getDonorFields(); - invariant(donorFields); + const auto& reshardingFields = _collDesc.getReshardingFields(); + invariant(reshardingFields); + const auto& donorFields = reshardingFields->getDonorFields(); + invariant(donorFields); - _reshardingChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo( - opCtx, donorFields->getTempReshardingNss(), true /* allowLocks */)); + _reshardingChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo( + opCtx, donorFields->getTempReshardingNss(), true /* allowLocks */)); - tassert(6862800, - "Routing information for the temporary resharing collection is stale", - _reshardingChunkMgr->isSharded()); - } + tassert(6862800, + "Routing information for the temporary resharing collection is stale", + _reshardingChunkMgr->isSharded()); } } boost::optional<ShardId> ShardingWriteRouter::getReshardingDestinedRecipient( const BSONObj& fullDocument) const { - if (!_reshardKeyPattern) { + if (!_reshardingKeyPattern) { return boost::none; } invariant(_ownershipFilter); - invariant(_shardKeyPattern); invariant(_reshardingChunkMgr); - if (!_ownershipFilter->keyBelongsToMe(_shardKeyPattern->extractShardKeyFromDoc(fullDocument))) { + const auto& shardKeyPattern = _collDesc.getShardKeyPattern(); + if (!_ownershipFilter->keyBelongsToMe(shardKeyPattern.extractShardKeyFromDoc(fullDocument))) { return boost::none; } - auto shardKey = _reshardKeyPattern->extractShardKeyFromDocThrows(fullDocument); + auto shardKey = _reshardingKeyPattern->extractShardKeyFromDocThrows(fullDocument); return _reshardingChunkMgr->findIntersectingChunkWithSimpleCollation(shardKey).getShardId(); } diff --git a/src/mongo/db/s/sharding_write_router.h b/src/mongo/db/s/sharding_write_router.h index ad4e05c78fa..f9d2de749f3 100644 --- a/src/mongo/db/s/sharding_write_router.h +++ b/src/mongo/db/s/sharding_write_router.h @@ -41,17 +41,22 @@ public: CatalogCache* catalogCache); CollectionShardingState* getCss() const { - return _css; + return &(*_scopedCss); + } + + const auto& getCollDesc() const { + return _collDesc; } boost::optional<ShardId> getReshardingDestinedRecipient(const BSONObj& fullDocument) const; private: - CollectionShardingState* _css{nullptr}; + CollectionShardingState::ScopedCollectionShardingState _scopedCss; + ScopedCollectionDescription _collDesc; boost::optional<ScopedCollectionFilter> _ownershipFilter; - boost::optional<ShardKeyPattern> _shardKeyPattern; - boost::optional<ShardKeyPattern> _reshardKeyPattern; + + boost::optional<ShardKeyPattern> _reshardingKeyPattern; boost::optional<ChunkManager> _reshardingChunkMgr; }; diff --git a/src/mongo/db/s/sharding_write_router_bm.cpp b/src/mongo/db/s/sharding_write_router_bm.cpp index b1e5c0197c0..e92d8292052 100644 --- a/src/mongo/db/s/sharding_write_router_bm.cpp +++ b/src/mongo/db/s/sharding_write_router_bm.cpp @@ -157,7 +157,8 @@ std::unique_ptr<CatalogCacheMock> createCatalogCacheMock(OperationContext* opCtx // Configuring the filtering metadata such that calls to getCollectionDescription return what we // want. Specifically the reshardingFields are what we use. Its specified by the chunkManager. - CollectionShardingRuntime::get(opCtx, kNss) + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, kNss, CSRAcquisitionMode::kExclusive) ->setFilteringMetadata(opCtx, CollectionMetadata(chunkManager, originatorShard)); auto catalogCache = CatalogCacheMock::make(); diff --git a/src/mongo/db/s/shardsvr_collmod_participant_command.cpp b/src/mongo/db/s/shardsvr_collmod_participant_command.cpp index f4d561c6ba5..f4853d88a15 100644 --- a/src/mongo/db/s/shardsvr_collmod_participant_command.cpp +++ b/src/mongo/db/s/shardsvr_collmod_participant_command.cpp @@ -102,7 +102,9 @@ public: // operation to refresh the metadata. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, bucketNs, MODE_IX); - CollectionShardingRuntime::get(opCtx, bucketNs)->clearFilteringMetadata(opCtx); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, bucketNs, CSRAcquisitionMode::kExclusive) + ->clearFilteringMetadata(opCtx); } auto service = ShardingRecoveryService::get(opCtx); diff --git a/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp b/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp index 6a32e9c4564..7c94c985870 100644 --- a/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp +++ b/src/mongo/db/s/shardsvr_commit_index_participant_command.cpp @@ -96,11 +96,12 @@ public: txnParticipant); { AutoGetCollection coll(opCtx, ns(), LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, ns()); - uassert( - 6711902, - "The critical section must be taken in order to execute this command", - csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, ns(), CSRAcquisitionMode::kShared); + uassert(6711902, + "The critical section must be taken in order to execute this command", + scopedCsr->getCriticalSectionSignal( + opCtx, ShardingMigrationCriticalSection::kWrite)); } opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); diff --git a/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp b/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp index b5f8d30b1ff..a54b956020c 100644 --- a/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp +++ b/src/mongo/db/s/shardsvr_drop_index_catalog_entry_participant_command.cpp @@ -95,11 +95,12 @@ public: txnParticipant); { AutoGetCollection coll(opCtx, ns(), LockMode::MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, ns()); - uassert( - 6711904, - "The critical section must be taken in order to execute this command", - csr->getCriticalSectionSignal(opCtx, ShardingMigrationCriticalSection::kWrite)); + auto scopedCsr = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, ns(), CSRAcquisitionMode::kShared); + uassert(6711904, + "The critical section must be taken in order to execute this command", + scopedCsr->getCriticalSectionSignal( + opCtx, ShardingMigrationCriticalSection::kWrite)); } opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); diff --git a/src/mongo/db/s/split_chunk.cpp b/src/mongo/db/s/split_chunk.cpp index 234b4272d09..2697c1463a7 100644 --- a/src/mongo/db/s/split_chunk.cpp +++ b/src/mongo/db/s/split_chunk.cpp @@ -27,7 +27,6 @@ * it in the license file. */ - #include "mongo/db/s/split_chunk.h" #include "mongo/base/status_with.h" @@ -52,7 +51,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - namespace mongo { namespace { @@ -103,8 +101,9 @@ bool checkMetadataForSuccessfulSplitChunk(OperationContext* opCtx, Lock::DBLock dbLock(opCtx, nss.dbName(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); - const auto metadataAfterSplit = - CollectionShardingRuntime::get(opCtx, nss)->getCurrentMetadataIfKnown(); + const auto metadataAfterSplit = CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kShared) + ->getCurrentMetadataIfKnown(); ShardId shardId = ShardingState::get(opCtx)->shardId(); diff --git a/src/mongo/db/s/split_vector_test.cpp b/src/mongo/db/s/split_vector_test.cpp index 11fb9eba29a..4ce4db33f2f 100644 --- a/src/mongo/db/s/split_vector_test.cpp +++ b/src/mongo/db/s/split_vector_test.cpp @@ -46,7 +46,9 @@ const std::string kPattern = "_id"; void setUnshardedFilteringMetadata(OperationContext* opCtx, const NamespaceString& nss) { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, nss)->setFilteringMetadata(opCtx, CollectionMetadata()); + CollectionShardingRuntime::assertCollectionLockedAndAcquire( + opCtx, nss, CSRAcquisitionMode::kExclusive) + ->setFilteringMetadata(opCtx, CollectionMetadata()); } class SplitVectorTest : public ShardServerTestFixture { diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 70d20e15c15..c2797112686 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -33,7 +33,6 @@ #include <string> #include "mongo/bson/bsonobj.h" -#include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/shard_id.h" #include "mongo/s/catalog/type_chunk_base_gen.h" |