diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-08-25 01:42:28 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-01 10:50:45 +0000 |
commit | 9cb82d2e8680717d3002459ba5fdb16036183a17 (patch) | |
tree | 6d49e2f6a2bba23707285e90ec1e8b3beba41400 | |
parent | ca4df25002a60910b38bfdd8d71eb5bff5a79b49 (diff) | |
download | mongo-9cb82d2e8680717d3002459ba5fdb16036183a17.tar.gz |
SERVER-50505 Make the CatalogCache return ChunkManager(s) directly
... instead of returning the intermediate CachedCollectionRoutingInfo
class. The ChunkManager should be the only class used for routing.
72 files changed, 737 insertions, 946 deletions
diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp index 6c93d22d318..4e4ddb99af7 100644 --- a/src/mongo/db/commands/feature_compatibility_version.cpp +++ b/src/mongo/db/commands/feature_compatibility_version.cpp @@ -54,8 +54,6 @@ #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" #include "mongo/transport/service_entry_point.h" namespace mongo { diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 2f7a8e06cb5..48b3c04c5b7 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -58,7 +58,6 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/database_version_helpers.h" -#include "mongo/s/grid.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 1d4480cb4ba..7a859ccbbd6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -52,8 +52,6 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index cadf989486d..c42def9ca86 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -57,8 +57,6 @@ #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/update/update_oplog_entry_version.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" namespace mongo { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 88429000f86..c7b8bd273ec 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -33,13 +33,10 @@ #include "mongo/db/pipeline/pipeline_d.h" -#include <memory> - #include "mongo/base/exact_cast.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -75,19 +72,12 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" -#include "mongo/db/transaction_participant.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" -#include "mongo/s/grid.h" #include "mongo/s/query/document_source_merge_cursors.h" -#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/time_support.h" namespace mongo { diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index 9faf9b11eaa..330ef41693e 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -47,7 +47,6 @@ #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/util/net/socket_utils.h" @@ -139,11 +138,10 @@ std::vector<BSONObj> CommonProcessInterface::getCurrentOps( std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsRouter( OperationContext* opCtx, const NamespaceString& nss) const { - auto cri = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (auto chunkManager = cri.cm()) { - return _shardKeyToDocumentKeyFields( - chunkManager->getShardKeyPattern().getKeyPatternFields()); + if (cm.isSharded()) { + return _shardKeyToDocumentKeyFields(cm.getShardKeyPattern().getKeyPatternFields()); } // We have no evidence this collection is sharded, so the document key is just _id. @@ -187,12 +185,12 @@ bool CommonProcessInterface::keyPatternNamesExactPaths(const BSONObj& keyPattern boost::optional<ChunkVersion> CommonProcessInterface::refreshAndGetCollectionVersion( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const { const bool forceRefreshFromThisThread = false; - auto routingInfo = uassertStatusOK( + auto cm = uassertStatusOK( Grid::get(expCtx->opCtx) ->catalogCache() ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread)); - if (auto chunkManager = routingInfo.cm()) { - return chunkManager->getVersion(); + if (cm.isSharded()) { + return cm.getVersion(); } return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 42d05465291..92b87ccb5fc 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -56,25 +56,19 @@ #include "mongo/util/fail_point.h" namespace mongo { - -using boost::intrusive_ptr; -using std::shared_ptr; -using std::string; -using std::unique_ptr; - namespace { /** * Returns the routing information for the namespace set on the passed ExpressionContext. Also * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry. */ -StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( - const intrusive_ptr<ExpressionContext>& expCtx) { +StatusWith<ChunkManager> getCollectionRoutingInfo( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); // Additionally check that the ExpressionContext's UUID matches the collection routing info. - if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().cm()) { - if (!swRoutingInfo.getValue().cm()->uuidMatches(*expCtx->uuid)) { + if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().isSharded()) { + if (!swRoutingInfo.getValue().uuidMatches(*expCtx->uuid)) { return {ErrorCodes::NamespaceNotFound, str::stream() << "The UUID of collection " << expCtx->ns.ns() << " changed; it may have been dropped and re-created."}; @@ -158,10 +152,10 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( str::stream() << "Looking up document matching " << redact(filter.toBson()), [&]() -> std::vector<RemoteCursor> { // Verify that the collection exists, with the correct UUID. - auto routingInfo = uassertStatusOK(getCollectionRoutingInfo(foreignExpCtx)); + auto cm = uassertStatusOK(getCollectionRoutingInfo(foreignExpCtx)); // Finalize the 'find' command object based on the routing table information. - if (findCmdIsByUuid && routingInfo.cm()) { + if (findCmdIsByUuid && cm.isSharded()) { // Find by UUID and shard versioning do not work together (SERVER-31946). In // the sharded case we've already checked the UUID, so find by namespace is // safe. In the unlikely case that the collection has been deleted and a new @@ -176,12 +170,8 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( // single shard will be targeted here; however, in certain cases where only the _id // is present, we may need to scatter-gather the query to all shards in order to // find the document. - auto requests = getVersionedRequestsForTargetedShards(expCtx->opCtx, - nss, - routingInfo, - findCmd, - filterObj, - CollationSpec::kSimpleSpec); + auto requests = getVersionedRequestsForTargetedShards( + expCtx->opCtx, nss, cm, findCmd, filterObj, CollationSpec::kSimpleSpec); // Dispatch the requests. The 'establishCursors' method conveniently prepares the // result into a vector of cursor responses for us. @@ -279,7 +269,7 @@ void MongosProcessInterface::_reportCurrentOpsForTransactionCoordinators( OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {}; std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( - const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { + const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { invariant(hasGlobalServiceContext()); auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); invariant(cursorManager); @@ -287,9 +277,9 @@ std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( } bool MongosProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfo = + auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - return static_cast<bool>(routingInfo.cm()); + return cm.isSharded(); } bool MongosProcessInterface::fieldsHaveSupportingUniqueIndex( diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index f7356c6ded7..b93a255eb30 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -63,9 +63,9 @@ ShardServerProcessInterface::ShardServerProcessInterface( } bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - return static_cast<bool>(routingInfo.cm()); + return cm.isSharded(); } void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( @@ -85,17 +85,17 @@ ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(Operati invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); auto* const catalogCache = Grid::get(opCtx)->catalogCache(); - auto swCollectionRoutingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); - if (swCollectionRoutingInfo.isOK()) { - auto cm = swCollectionRoutingInfo.getValue().cm(); - if (cm && cm->uuidMatches(uuid)) { + auto swCM = catalogCache->getCollectionRoutingInfo(opCtx, nss); + if (swCM.isOK()) { + const auto& cm = swCM.getValue(); + if (cm.isSharded() && cm.uuidMatches(uuid)) { // Unpack the shard key. Collection is now sharded so the document key fields will never // change, mark as final. - return {_shardKeyToDocumentKeyFields(cm->getShardKeyPattern().getKeyPatternFields()), + return {_shardKeyToDocumentKeyFields(cm.getShardKeyPattern().getKeyPatternFields()), true}; } - } else if (swCollectionRoutingInfo != ErrorCodes::NamespaceNotFound) { - uassertStatusOK(std::move(swCollectionRoutingInfo)); + } else if (swCM != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(std::move(swCM)); } // An unsharded collection can still become sharded so is not final. If the uuid doesn't match diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e9b0e3f48ae..e4275b4a3d9 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -154,25 +154,23 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, return cmdForShards.freeze().toBson(); } -std::vector<RemoteCursor> establishShardCursors( - OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor, - const NamespaceString& nss, - bool hasChangeStream, - boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const std::set<ShardId>& shardIds, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref) { +std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + bool mustRunOnAll, + boost::optional<ChunkManager>& cm, + const std::set<ShardId>& shardIds, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref) { LOGV2_DEBUG(20904, 1, "Dispatching command {cmdObj} to establish cursors on shards", "cmdObj"_attr = redact(cmdObj)); - const bool mustRunOnAll = mustRunOnAllShards(nss, hasChangeStream); std::vector<std::pair<ShardId, BSONObj>> requests; // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo || mustRunOnAll); + invariant(cm || mustRunOnAll); if (mustRunOnAll) { // The pipeline contains a stage which must be run on all shards. Skip versioning and @@ -180,22 +178,21 @@ std::vector<RemoteCursor> establishShardCursors( for (const auto& shardId : shardIds) { requests.emplace_back(shardId, cmdObj); } - } else if (routingInfo->cm()) { + } else if (cm->isSharded()) { // The collection is sharded. Use the routing table to decide which shards to target // based on the query and collation, and build versioned requests for them. for (const auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + auto versionedCmdObj = appendShardVersion(cmdObj, cm->getVersion(shardId)); requests.emplace_back(shardId, std::move(versionedCmdObj)); } } else { // The collection is unsharded. Target only the primary shard for the database. // Don't append shard version info when contacting the config servers. - const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig() + const auto cmdObjWithShardVersion = cm->dbPrimary() != ShardRegistry::kConfigServerShardId ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) : cmdObj; - requests.emplace_back(routingInfo->db().primaryId(), - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo->db())); + requests.emplace_back(cm->dbPrimary(), + appendDbVersionIfPresent(cmdObjWithShardVersion, cm->dbVersion())); } if (MONGO_unlikely(shardedAggregateHangBeforeEstablishingShardCursors.shouldFail())) { @@ -218,7 +215,7 @@ std::vector<RemoteCursor> establishShardCursors( std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expCtx, bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const boost::optional<ChunkManager>& cm, const BSONObj shardQuery, const BSONObj collation) { if (mustRunOnAllShards) { @@ -228,10 +225,8 @@ std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expC return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())}; } - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo); - - return getTargetedShardsForQuery(expCtx, *routingInfo, shardQuery, collation); + invariant(cm); + return getTargetedShardsForQuery(expCtx, *cm, shardQuery, collation); } /** @@ -657,9 +652,9 @@ boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationConte return boost::none; } - const auto routingInfo = + const auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, mergeStage->getOutputNs())); - if (!routingInfo.cm()) { + if (!cm.isSharded()) { return boost::none; } @@ -671,7 +666,7 @@ boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationConte // inserted on. With this ability we can insert an exchange on the shards to partition the // documents based on which shard will end up owning them. Then each shard can perform a merge // of only those documents which belong to it (optimistically, barring chunk migrations). - return walkPipelineBackwardsTrackingShardKey(opCtx, mergePipeline, *routingInfo.cm()); + return walkPipelineBackwardsTrackingShardKey(opCtx, mergePipeline, cm); } SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { @@ -794,7 +789,7 @@ DispatchShardPipelineResults dispatchShardPipeline( auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; + : boost::optional<ChunkManager>{}; // Determine whether we can run the entire aggregation on a single shard. const auto collationObj = expCtx->getCollatorBSON(); @@ -808,7 +803,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // - The pipeline contains one or more stages which must always merge on mongoS. const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || (needsPrimaryShardMerge && executionNsRoutingInfo && - *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + *(shardIds.begin()) != executionNsRoutingInfo->dbPrimary())); boost::optional<ShardedExchangePolicy> exchangeSpec; boost::optional<SplitPipeline> splitPipelines; @@ -898,7 +893,7 @@ DispatchShardPipelineResults dispatchShardPipeline( cursors = establishShardCursors(opCtx, expCtx->mongoProcessInterface->taskExecutor, expCtx->ns, - hasChangeStream, + mustRunOnAll, executionNsRoutingInfo, shardIds, targetedCommand, @@ -926,7 +921,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // must increment the number of involved shards. CurOp::get(opCtx)->debug().nShards = shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); + !shardIds.count(executionNsRoutingInfo->dbPrimary())); return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(ownedCursors), @@ -1099,8 +1094,8 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) { return BSON("pipeline" << explainBuilder.done()); } -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss) { +StatusWith<ChunkManager> getExecutionNsRoutingInfo(OperationContext* opCtx, + const NamespaceString& execNss) { // First, verify that there are shards present in the cluster. If not, then we return the // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on @@ -1109,12 +1104,11 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte // aggregations do when the database does not exist. std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - if (shardIds.size() == 0) { + if (shardIds.empty()) { return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; } - // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not - // exist. + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not exist return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index b537406cb6e..13a20fee607 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -35,8 +35,6 @@ #include "mongo/s/query/owned_remote_cursor.h" namespace mongo { -class CachedCollectionRoutingInfo; - namespace sharded_agg_helpers { /** @@ -172,8 +170,8 @@ Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, * Returns 'ShardNotFound' or 'NamespaceNotFound' if there are no shards in the cluster or if * collection 'execNss' does not exist, respectively. */ -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss); +StatusWith<ChunkManager> getExecutionNsRoutingInfo(OperationContext* opCtx, + const NamespaceString& execNss); /** * Returns true if an aggregation over 'nss' must run on all shards. diff --git a/src/mongo/db/s/README.md b/src/mongo/db/s/README.md index b4737b57764..bf23835067c 100644 --- a/src/mongo/db/s/README.md +++ b/src/mongo/db/s/README.md @@ -99,11 +99,9 @@ collection or database. A full refresh occurs when: * [The CatalogCache (routing table cache) class](https://github.com/mongodb/mongo/blob/master/src/mongo/s/catalog_cache.h) * [The CachedDatabaseInfo class](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L61-L81) -* [The CachedCollectionRoutingInfo class](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L83-L119) Methods that will mark routing table cache information as stale (sharded collection). -* [onStaleShardVersion](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L207-L213) * [invalidateShardOrEntireCollectionEntryForShardedCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L226-L236) * [invalidateShardForShardedCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L262-L268) * [invalidateEntriesThatReferenceShard](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L270-L274) diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index d91670c60d8..8cdcbc7c8ec 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -50,7 +50,6 @@ #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/balancer_collection_status_gen.h" @@ -612,13 +611,13 @@ Status Balancer::_splitChunksIfNeeded(OperationContext* opCtx) { return routingInfoStatus.getStatus(); } - auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); auto splitStatus = shardutil::splitChunkAtMultiplePoints(opCtx, splitInfo.shardId, splitInfo.nss, - cm->getShardKeyPattern(), + cm.getShardKeyPattern(), splitInfo.collectionVersion, ChunkRange(splitInfo.minKey, splitInfo.maxKey), splitInfo.splitKeys); @@ -701,17 +700,16 @@ int Balancer::_moveChunks(OperationContext* opCtx, void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); - auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey); + auto chunk = cm.findIntersectingChunkWithSimpleCollation(minKey); try { const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( opCtx, chunk.getShardId(), nss, - cm->getShardKeyPattern(), + cm.getShardKeyPattern(), ChunkRange(chunk.getMin(), chunk.getMax()), Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), boost::none)); @@ -747,8 +745,8 @@ void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, shardutil::splitChunkAtMultiplePoints(opCtx, chunk.getShardId(), nss, - cm->getShardKeyPattern(), - cm->getVersion(), + cm.getShardKeyPattern(), + cm.getVersion(), ChunkRange(chunk.getMin(), chunk.getMax()), splitPoints)); } catch (const DBException&) { diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index e41e0f977f1..d1aa8aae5a6 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -65,7 +65,10 @@ namespace { * distribution and chunk placement information which is needed by the balancer policy. */ StatusWith<DistributionStatus> createCollectionDistributionStatus( - OperationContext* opCtx, const ShardStatisticsVector& allShards, const ChunkManager& chunkMgr) { + OperationContext* opCtx, + const NamespaceString& nss, + const ShardStatisticsVector& allShards, + const ChunkManager& chunkMgr) { ShardToChunksMap shardToChunksMap; // Makes sure there is an entry in shardToChunksMap for every shard, so empty shards will also @@ -76,7 +79,7 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( chunkMgr.forEachChunk([&](const auto& chunkEntry) { ChunkType chunk; - chunk.setNS(chunkMgr.getns()); + chunk.setNS(nss); chunk.setMin(chunkEntry.getMin()); chunk.setMax(chunkEntry.getMax()); chunk.setJumbo(chunkEntry.isJumbo()); @@ -89,14 +92,14 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( }); const auto swCollectionTags = - Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, chunkMgr.getns()); + Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, nss); if (!swCollectionTags.isOK()) { return swCollectionTags.getStatus().withContext( - str::stream() << "Unable to load tags for collection " << chunkMgr.getns()); + str::stream() << "Unable to load tags for collection " << nss); } const auto& collectionTags = swCollectionTags.getValue(); - DistributionStatus distribution(chunkMgr.getns(), std::move(shardToChunksMap)); + DistributionStatus distribution(nss, std::move(shardToChunksMap)); // Cache the collection tags const auto& keyPattern = chunkMgr.getShardKeyPattern().getKeyPattern(); @@ -182,16 +185,16 @@ private: * Populates splitCandidates with chunk and splitPoint pairs for chunks that violate tag * range boundaries. */ -void getSplitCandidatesToEnforceTagRanges(const ChunkManager* cm, +void getSplitCandidatesToEnforceTagRanges(const ChunkManager& cm, const DistributionStatus& distribution, SplitCandidatesBuffer* splitCandidates) { - const auto& globalMax = cm->getShardKeyPattern().getKeyPattern().globalMax(); + const auto& globalMax = cm.getShardKeyPattern().getKeyPattern().globalMax(); // For each tag range, find chunks that need to be split. for (const auto& tagRangeEntry : distribution.tagRanges()) { const auto& tagRange = tagRangeEntry.second; - const auto chunkAtZoneMin = cm->findIntersectingChunkWithSimpleCollation(tagRange.min); + const auto chunkAtZoneMin = cm.findIntersectingChunkWithSimpleCollation(tagRange.min); invariant(chunkAtZoneMin.getMax().woCompare(tagRange.min) > 0); if (chunkAtZoneMin.getMin().woCompare(tagRange.min)) { @@ -202,7 +205,7 @@ void getSplitCandidatesToEnforceTagRanges(const ChunkManager* cm, if (!tagRange.max.woCompare(globalMax)) continue; - const auto chunkAtZoneMax = cm->findIntersectingChunkWithSimpleCollation(tagRange.max); + const auto chunkAtZoneMax = cm.findIntersectingChunkWithSimpleCollation(tagRange.max); // We need to check that both the chunk's minKey does not match the zone's max and also that // the max is not equal, which would only happen in the case of the zone ending in MaxKey. @@ -221,11 +224,11 @@ void getSplitCandidatesToEnforceTagRanges(const ChunkManager* cm, * splitCandidates with chunk and splitPoint pairs for chunks that need to split. */ void getSplitCandidatesForSessionsCollection(OperationContext* opCtx, - const ChunkManager* cm, + const ChunkManager& cm, SplitCandidatesBuffer* splitCandidates) { const auto minNumChunks = minNumChunksForSessionsCollection.load(); - if (cm->numChunks() >= minNumChunks) { + if (cm.numChunks() >= minNumChunks) { return; } @@ -256,7 +259,7 @@ void getSplitCandidatesForSessionsCollection(OperationContext* opCtx, // For each split point, find a chunk that needs to be split. for (auto& splitPoint : splitPoints) { - const auto chunkAtSplitPoint = cm->findIntersectingChunkWithSimpleCollation(splitPoint); + const auto chunkAtSplitPoint = cm.findIntersectingChunkWithSimpleCollation(splitPoint); invariant(chunkAtSplitPoint.getMax().woCompare(splitPoint) > 0); if (chunkAtSplitPoint.getMin().woCompare(splitPoint)) { @@ -458,16 +461,17 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* op const auto& shardStats = shardStatsStatus.getValue(); + const auto& nss = chunk.getNS(); + auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - chunk.getNS()); + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); if (!routingInfoStatus.isOK()) { return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -485,18 +489,19 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt return shardStatsStatus.getStatus(); } + const auto& nss = chunk.getNS(); + auto shardStats = std::move(shardStatsStatus.getValue()); auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - chunk.getNS()); + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); if (!routingInfoStatus.isOK()) { return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -527,9 +532,9 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidate return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -537,7 +542,7 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidate const DistributionStatus& distribution = collInfoStatus.getValue(); // Accumulate split points for the same chunk together - SplitCandidatesBuffer splitCandidates(nss, cm->getVersion()); + SplitCandidatesBuffer splitCandidates(nss, cm.getVersion()); if (nss == NamespaceString::kLogicalSessionsNamespace) { if (!distribution.tags().empty()) { @@ -565,11 +570,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); + const auto& shardKeyPattern = cm.getShardKeyPattern().getKeyPattern(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -579,7 +584,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi for (const auto& tagRangeEntry : distribution.tagRanges()) { const auto& tagRange = tagRangeEntry.second; - const auto chunkAtZoneMin = cm->findIntersectingChunkWithSimpleCollation(tagRange.min); + const auto chunkAtZoneMin = cm.findIntersectingChunkWithSimpleCollation(tagRange.min); if (chunkAtZoneMin.getMin().woCompare(tagRange.min)) { return {ErrorCodes::IllegalOperation, @@ -595,7 +600,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi if (!tagRange.max.woCompare(shardKeyPattern.globalMax())) continue; - const auto chunkAtZoneMax = cm->findIntersectingChunkWithSimpleCollation(tagRange.max); + const auto chunkAtZoneMax = cm.findIntersectingChunkWithSimpleCollation(tagRange.max); // We need to check that both the chunk's minKey does not match the zone's max and also that // the max is not equal, which would only happen in the case of the zone ending in MaxKey. diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 923cc9d9151..3169b1fb440 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -182,18 +182,15 @@ Status MigrationManager::executeManualMigration( &scopedMigrationRequests) ->get(); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, migrateInfo.nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( + opCtx, migrateInfo.nss); + if (!swCM.isOK()) { + return swCM.getStatus(); } - auto& routingInfo = routingInfoStatus.getValue(); - - const auto chunk = - routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); + const auto& cm = swCM.getValue(); + const auto chunk = cm.findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); Status commandStatus = remoteCommandResponse.status; @@ -333,10 +330,9 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss); - if (!routingInfoStatus.isOK()) { + auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( + opCtx, nss); + if (!swCM.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer // throughout. Abort migration recovery. @@ -345,11 +341,11 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, "recovery. Abandoning recovery: {error}", "Unable to reload chunk metadata for collection during balancer recovery", "namespace"_attr = nss, - "error"_attr = redact(routingInfoStatus.getStatus())); + "error"_attr = redact(swCM.getStatus())); return; } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& cm = swCM.getValue(); int scheduledMigrations = 0; @@ -359,8 +355,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, auto waitForDelete = migrationType.getWaitForDelete(); migrateInfos.pop_front(); - const auto chunk = - routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); + const auto chunk = cm.findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); if (chunk.getShardId() != migrationInfo.from) { // Chunk is no longer on the source shard specified by this migration. Erase the diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index 90c1628ee20..37a6f797064 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -111,14 +111,14 @@ Status splitChunkAtMultiplePoints(OperationContext* opCtx, */ void moveChunk(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { // We need to have the most up-to-date view of the chunk we are about to move. - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::NamespaceNotSharded, "Could not move chunk. Collection is no longer sharded", - routingInfo.cm()); + cm.isSharded()); - const auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(minKey); + const auto suggestedChunk = cm.findIntersectingChunkWithSimpleCollation(minKey); ChunkType chunkToMove; chunkToMove.setNS(nss); @@ -297,18 +297,17 @@ void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSp try { const auto opCtx = cc().makeOperationContext(); - const auto routingInfo = uassertStatusOK( - Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss)); - const auto cm = routingInfo.cm(); + const auto cm = uassertStatusOK( + Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss)); uassert(ErrorCodes::NamespaceNotSharded, "Could not split chunk. Collection is no longer sharded", - cm); + cm.isSharded()); // Best effort checks that the chunk we're splitting hasn't changed bounds or moved shards // since the auto split task was scheduled. Best effort because the chunk metadata may // change after this point. - const auto chunk = cm->findIntersectingChunkWithSimpleCollation(min); + const auto chunk = cm.findIntersectingChunkWithSimpleCollation(min); uassert(4860100, "Chunk to be auto split has different boundaries than when the split was initiated", chunk.getRange() == ChunkRange(min, max)); @@ -316,7 +315,7 @@ void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSp "Chunk to be auto split isn't owned by this shard", ShardingState::get(opCtx.get())->shardId() == chunk.getShardId()); - const auto& shardKeyPattern = cm->getShardKeyPattern(); + const auto& shardKeyPattern = cm.getShardKeyPattern(); const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration(); // Ensure we have the most up-to-date balancer configuration @@ -395,7 +394,7 @@ void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSp chunk.getShardId(), nss, shardKeyPattern, - cm->getVersion(), + cm.getVersion(), chunk.getRange(), splitPoints)); chunkSplitStateDriver->commitSplit(); diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index c275530b384..7da06febea4 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -96,7 +96,7 @@ protected: return std::vector<ChunkType>{chunk1, chunk2, chunk3, chunk4}; }()); - ChunkManager cm(rt, boost::none); + ChunkManager cm(ShardId("0"), DatabaseVersion(UUID::gen(), 1), rt, boost::none); ASSERT_EQ(4, cm.numChunks()); { diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index eaa5d5bcf79..1cb96520d96 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -82,7 +82,8 @@ std::unique_ptr<CollectionMetadata> makeCollectionMetadataImpl( UUID uuid(UUID::gen()); auto rt = RoutingTableHistory::makeNew(kNss, uuid, shardKeyPattern, nullptr, false, epoch, allChunks); - return std::make_unique<CollectionMetadata>(ChunkManager(rt, kChunkManager), kThisShard); + return std::make_unique<CollectionMetadata>( + ChunkManager(kThisShard, DatabaseVersion(UUID::gen(), 1), rt, kChunkManager), kThisShard); } struct ConstructedRangeMap : public RangeMap { diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index 58415ae97eb..77fc9616d93 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -52,6 +52,8 @@ CollectionMetadata makeShardedMetadata(OperationContext* opCtx, UUID uuid = UUID auto range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); auto chunk = ChunkType(kTestNss, std::move(range), ChunkVersion(1, 0, epoch), ShardId("other")); ChunkManager cm( + ShardId("0"), + DatabaseVersion(UUID::gen(), 1), RoutingTableHistory::makeNew( kTestNss, uuid, kShardKeyPattern, nullptr, false, epoch, {std::move(chunk)}), boost::none); diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp index 8710cb63fcd..977fd9640b0 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.cpp +++ b/src/mongo/db/s/config/config_server_test_fixture.cpp @@ -68,7 +68,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/database_version_helpers.h" -#include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_id.h" diff --git a/src/mongo/db/s/config/configsvr_add_shard_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_command.cpp index 058a43b18de..c7003caddb0 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_command.cpp @@ -44,15 +44,12 @@ #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_shard.h" -#include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_request_type.h" #include "mongo/util/str.h" namespace mongo { namespace { -using std::string; - const long long kMaxSizeMBDefault = 0; /** @@ -122,7 +119,7 @@ public: parsedRequest.hasMaxSize() ? parsedRequest.getMaxSize() : kMaxSizeMBDefault); - StatusWith<string> addShardResult = ShardingCatalogManager::get(opCtx)->addShard( + StatusWith<std::string> addShardResult = ShardingCatalogManager::get(opCtx)->addShard( opCtx, parsedRequest.hasName() ? &parsedRequest.getName() : nullptr, parsedRequest.getConnString(), diff --git a/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp index 42a9eef532c..217511db4b7 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp @@ -38,7 +38,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/config/sharding_catalog_manager.h" -#include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_to_zone_request_type.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index b9bc5987b04..7ecbb51e33f 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -91,7 +91,7 @@ public: validateZones(request().getZones().get(), authoritativeTags); } - const auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); @@ -112,8 +112,7 @@ public: chunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern()); numInitialChunks = chunks.size(); } else { - numInitialChunks = - request().getNumInitialChunks().get_value_or(routingInfo.cm()->numChunks()); + numInitialChunks = request().getNumInitialChunks().get_value_or(cm.numChunks()); } } diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 7f83dfaca1d..22a0e23282d 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -42,13 +42,13 @@ #include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/logv2/log.h" -#include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" #include "mongo/util/time_support.h" namespace mongo { namespace { + using TaskExecutor = executor::TaskExecutor; using CallbackArgs = TaskExecutor::CallbackArgs; @@ -136,9 +136,9 @@ std::shared_ptr<ScopedCollectionDescription::Impl> MetadataManager::getActiveMet lg, shared_from_this(), std::move(activeMetadataTracker)); } - auto chunkManager = activeMetadata->getChunkManager(); - ChunkManager chunkManagerAtClusterTime = - ChunkManager(chunkManager->getRoutingHistory(), atClusterTime->asTimestamp()); + auto cm = activeMetadata->getChunkManager(); + ChunkManager chunkManagerAtClusterTime = ChunkManager( + cm->dbPrimary(), cm->dbVersion(), cm->getRoutingHistory(), atClusterTime->asTimestamp()); class MetadataAtTimestamp : public ScopedCollectionDescription::Impl { public: diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 6ec878a9bc1..d0eb10912ae 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -87,7 +87,8 @@ protected: epoch, {ChunkType{kNss, range, ChunkVersion(1, 0, epoch), kOtherShard}}); - return CollectionMetadata(ChunkManager(rt, boost::none), kThisShard); + return CollectionMetadata( + ChunkManager(kThisShard, DatabaseVersion(UUID::gen(), 1), rt, boost::none), kThisShard); } /** @@ -129,7 +130,8 @@ protected: auto rt = cm->getRoutingHistory()->makeUpdated(splitChunks); - return CollectionMetadata(ChunkManager(rt, boost::none), kThisShard); + return CollectionMetadata(ChunkManager(cm->dbPrimary(), cm->dbVersion(), rt, boost::none), + kThisShard); } static CollectionMetadata cloneMetadataMinusChunk(const CollectionMetadata& metadata, @@ -150,7 +152,8 @@ protected: auto rt = cm->getRoutingHistory()->makeUpdated( {ChunkType(kNss, ChunkRange(minKey, maxKey), chunkVersion, kOtherShard)}); - return CollectionMetadata(ChunkManager(rt, boost::none), kThisShard); + return CollectionMetadata(ChunkManager(cm->dbPrimary(), cm->dbVersion(), rt, boost::none), + kThisShard); } std::shared_ptr<MetadataManager> _manager; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 7d8b0d1c6f3..fe92a9dada3 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -164,7 +164,11 @@ protected: CollectionShardingRuntime::get(operationContext(), kNss) ->setFilteringMetadata( operationContext(), - CollectionMetadata(ChunkManager(rt, boost::none), ShardId("dummyShardId"))); + CollectionMetadata(ChunkManager(ShardId("dummyShardId"), + DatabaseVersion(UUID::gen(), 1), + rt, + boost::none), + ShardId("dummyShardId"))); }(); _client->createIndex(kNss.ns(), kShardKeyPattern); diff --git a/src/mongo/db/s/move_timing_helper.cpp b/src/mongo/db/s/move_timing_helper.cpp index dab096d5511..f3270f67986 100644 --- a/src/mongo/db/s/move_timing_helper.cpp +++ b/src/mongo/db/s/move_timing_helper.cpp @@ -37,7 +37,7 @@ #include "mongo/db/curop.h" #include "mongo/db/s/sharding_logging.h" #include "mongo/logv2/log.h" -#include "mongo/s/grid.h" +#include "mongo/s/catalog/sharding_catalog_client.h" namespace mongo { diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 5e7a4167185..521c4bf7b4f 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -55,7 +55,7 @@ const auto getIsMigrating = OperationContext::declareDecoration<bool>(); * restarted. */ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, - CollectionMetadata const& metadata, + const CollectionMetadata& metadata, const BSONObj& doc) { const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); if (!atClusterTime) @@ -64,8 +64,9 @@ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, auto shardKey = metadata.getShardKeyPattern().extractShardKeyFromDoc(doc); // We can assume the simple collation because shard keys do not support non-simple collations. - ChunkManager chunkManagerAtClusterTime(metadata.getChunkManager()->getRoutingHistory(), - atClusterTime->asTimestamp()); + auto cm = metadata.getChunkManager(); + ChunkManager chunkManagerAtClusterTime( + cm->dbPrimary(), cm->dbVersion(), cm->getRoutingHistory(), atClusterTime->asTimestamp()); auto chunk = chunkManagerAtClusterTime.findIntersectingChunkWithSimpleCollation(shardKey); // Throws if the chunk has moved since the timestamp of the running transaction's atClusterTime diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp index 0d8597e9e98..a282455af15 100644 --- a/src/mongo/db/s/op_observer_sharding_test.cpp +++ b/src/mongo/db/s/op_observer_sharding_test.cpp @@ -66,7 +66,9 @@ CollectionMetadata makeAMetadata(BSONObj const& keyPattern) { auto rt = RoutingTableHistory::makeNew( kTestNss, UUID::gen(), KeyPattern(keyPattern), nullptr, false, epoch, {std::move(chunk)}); - return CollectionMetadata(ChunkManager(rt, Timestamp(100, 0)), ShardId("this")); + return CollectionMetadata( + ChunkManager(ShardId("this"), DatabaseVersion(UUID::gen(), 1), rt, Timestamp(100, 0)), + ShardId("this")); } class DeleteStateTest : public ShardServerTestFixture {}; diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index a8bdd46cf56..4af4b139430 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -104,9 +104,12 @@ public: AutoGetDb autoDb(operationContext(), kNss.db(), MODE_IX); Lock::CollectionLock collLock(operationContext(), kNss, MODE_IX); CollectionShardingRuntime::get(operationContext(), kNss) - ->setFilteringMetadata( - operationContext(), - CollectionMetadata(ChunkManager(rt, boost::none), ShardId("dummyShardId"))); + ->setFilteringMetadata(operationContext(), + CollectionMetadata(ChunkManager(ShardId("dummyShardId"), + DatabaseVersion(UUID::gen(), 1), + rt, + boost::none), + ShardId("dummyShardId"))); } UUID uuid() const { diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 1702c48ff8c..ab219ed34fb 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -321,14 +321,14 @@ CollectionMetadata forceGetCurrentMetadata(OperationContext* opCtx, const Namesp auto* const shardingState = ShardingState::get(opCtx); invariant(shardingState->canAcceptShardedCommands()); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss, true)); - if (!routingInfo.cm()) { + if (!cm.isSharded()) { return CollectionMetadata(); } - return CollectionMetadata(*routingInfo.cm(), shardingState->shardId()); + return CollectionMetadata(cm, shardingState->shardId()); } ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, @@ -344,13 +344,12 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, auto* const shardingState = ShardingState::get(opCtx); invariant(shardingState->canAcceptShardedCommands()); - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh( opCtx, nss, forceRefreshFromThisThread)); - auto cm = routingInfo.cm(); - if (!cm) { - // No chunk manager, so unsharded. Avoid using AutoGetCollection() as it returns the + if (!cm.isSharded()) { + // The collection is not sharded. Avoid using AutoGetCollection() as it returns the // InvalidViewDefinition error code if an invalid view is in the 'system.views' collection. AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); @@ -373,8 +372,8 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, if (optMetadata) { const auto& metadata = *optMetadata; if (metadata.isSharded() && - metadata.getCollVersion().epoch() == cm->getVersion().epoch() && - metadata.getCollVersion() >= cm->getVersion()) { + metadata.getCollVersion().epoch() == cm.getVersion().epoch() && + metadata.getCollVersion() >= cm.getVersion()) { LOGV2_DEBUG( 22063, 1, @@ -384,7 +383,7 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, "metadata", "namespace"_attr = nss, "latestCollectionVersion"_attr = metadata.getCollVersion(), - "refreshedCollectionVersion"_attr = cm->getVersion()); + "refreshedCollectionVersion"_attr = cm.getVersion()); return metadata.getShardVersion(); } } @@ -404,8 +403,8 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, if (optMetadata) { const auto& metadata = *optMetadata; if (metadata.isSharded() && - metadata.getCollVersion().epoch() == cm->getVersion().epoch() && - metadata.getCollVersion() >= cm->getVersion()) { + metadata.getCollVersion().epoch() == cm.getVersion().epoch() && + metadata.getCollVersion() >= cm.getVersion()) { LOGV2_DEBUG( 22064, 1, @@ -415,13 +414,13 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, "metadata", "namespace"_attr = nss, "latestCollectionVersion"_attr = metadata.getCollVersion(), - "refreshedCollectionVersion"_attr = cm->getVersion()); + "refreshedCollectionVersion"_attr = cm.getVersion()); return metadata.getShardVersion(); } } } - CollectionMetadata metadata(*cm, shardingState->shardId()); + CollectionMetadata metadata(cm, shardingState->shardId()); const auto newShardVersion = metadata.getShardVersion(); csr->setFilteringMetadata(opCtx, std::move(metadata)); diff --git a/src/mongo/db/s/shard_key_util.cpp b/src/mongo/db/s/shard_key_util.cpp index 2cae3071624..e216f9f682d 100644 --- a/src/mongo/db/s/shard_key_util.cpp +++ b/src/mongo/db/s/shard_key_util.cpp @@ -231,16 +231,16 @@ void ValidationBehaviorsShardCollection::createShardKeyIndex( ValidationBehaviorsRefineShardKey::ValidationBehaviorsRefineShardKey(OperationContext* opCtx, const NamespaceString& nss) : _opCtx(opCtx) { - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "refineCollectionShardKey namespace " << nss.toString() << " is not sharded", - routingInfo.cm()); - const auto minKeyShardId = routingInfo.cm()->getMinKeyShardIdWithSimpleCollation(); + cm.isSharded()); + const auto minKeyShardId = cm.getMinKeyShardIdWithSimpleCollation(); _indexShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId)); - _cm.emplace(*routingInfo.cm()); + _cm = std::move(cm); } std::vector<BSONObj> ValidationBehaviorsRefineShardKey::loadIndexes( diff --git a/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp b/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp index 71bf6d40005..7a9c6636d9b 100644 --- a/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp +++ b/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp @@ -51,7 +51,7 @@ protected: * the expected vector. */ void runBuildVersionedRequestsExpect( - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const BSONObj& query, @@ -59,7 +59,7 @@ protected: const std::vector<AsyncRequestsSender::Request>& expectedRequests) { const auto actualRequests = buildVersionedRequestsForTargetedShards( - operationContext(), kNss, routingInfo, shardsToSkip, cmdObj, query, collation); + operationContext(), kNss, cm, shardsToSkip, cmdObj, query, collation); ASSERT_EQ(expectedRequests.size(), actualRequests.size()); _assertShardIdsMatch(expectedRequests, actualRequests); @@ -112,10 +112,10 @@ TEST_F(BuildVersionedRequestsForTargetedShardsTest, ReturnPrimaryShardForUnshard expectGetDatabaseUnsharded(); expectGetCollectionUnsharded(); - auto routingInfo = future.default_timed_get(); + auto cm = future.default_timed_get(); AsyncRequestsSender::Request expectedRequest{ShardId(_shards[0].getName()), {}}; - runBuildVersionedRequestsExpect(*routingInfo, {}, {}, {}, {}, {expectedRequest}); + runBuildVersionedRequestsExpect(*cm, {}, {}, {}, {}, {expectedRequest}); } TEST_F(BuildVersionedRequestsForTargetedShardsTest, @@ -125,9 +125,9 @@ TEST_F(BuildVersionedRequestsForTargetedShardsTest, expectGetDatabaseUnsharded(); expectGetCollectionUnsharded(); - auto routingInfo = future.default_timed_get(); + auto cm = future.default_timed_get(); - runBuildVersionedRequestsExpect(*routingInfo, {ShardId(_shards[0].getName())}, {}, {}, {}, {}); + runBuildVersionedRequestsExpect(*cm, {ShardId(_shards[0].getName())}, {}, {}, {}, {}); } } // namespace mongo diff --git a/src/mongo/s/catalog/type_shard.cpp b/src/mongo/s/catalog/type_shard.cpp index 325fc4c1c2c..36e8e931536 100644 --- a/src/mongo/s/catalog/type_shard.cpp +++ b/src/mongo/s/catalog/type_shard.cpp @@ -35,7 +35,6 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" -#include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index fd4ef9f5d06..c9bc5853f62 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -180,8 +180,8 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx } } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( - OperationContext* opCtx, const NamespaceString& nss) { +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) { return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo; } @@ -198,8 +198,9 @@ CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationCon } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoAt( - OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime) { +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + Timestamp atClusterTime) { return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo; } @@ -276,12 +277,10 @@ CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt( continue; } - return {CachedCollectionRoutingInfo(nss, - dbInfo, - collEntry->routingInfo - ? boost::optional<ChunkManager>(ChunkManager( - collEntry->routingInfo, atClusterTime)) - : boost::none), + return {ChunkManager(dbInfo.primaryId(), + dbInfo.databaseVersion(), + collEntry->routingInfo, + atClusterTime), refreshActionTaken}; } } @@ -293,7 +292,7 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon return getDatabase(opCtx, dbName); } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh( +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) { auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss); // We want to ensure that we don't join an in-progress refresh because that @@ -308,15 +307,18 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWi return refreshResult.statusWithInfo; } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( +StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfoStatus = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo; - if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().cm()) { + auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo; + if (!swRoutingInfo.isOK()) + return swRoutingInfo; + + auto cri(std::move(swRoutingInfo.getValue())); + if (!cri.isSharded()) return {ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << nss.ns() << " is not sharded."}; - } - return routingInfoStatus; + return cri; } void CatalogCache::onStaleDatabaseVersion(const StringData dbName, @@ -333,48 +335,6 @@ void CatalogCache::onStaleDatabaseVersion(const StringData dbName, } } -void CatalogCache::onStaleShardVersion(CachedCollectionRoutingInfo&& ccriToInvalidate, - const ShardId& staleShardId) { - _stats.countStaleConfigErrors.addAndFetch(1); - - // Ensure the move constructor of CachedCollectionRoutingInfo is invoked in order to clear the - // input argument so it can't be used anymore - auto ccri(ccriToInvalidate); - - if (!ccri._cm) { - // We received StaleShardVersion for a collection we thought was unsharded. The collection - // must have become sharded. - onEpochChange(ccri._nss); - return; - } - - // We received StaleShardVersion for a collection we thought was sharded. Either a migration - // occurred to or from a shard we contacted, or the collection was dropped. - stdx::lock_guard<Latch> lg(_mutex); - - const auto nss = ccri._cm->getns(); - const auto itDb = _collectionsByDb.find(nss.db()); - if (itDb == _collectionsByDb.end()) { - // The database was dropped. - return; - } - - auto itColl = itDb->second.find(nss.ns()); - if (itColl == itDb->second.end()) { - // The collection was dropped. - } else if (itColl->second->needsRefresh && itColl->second->epochHasChanged) { - // If the epoch has changed, this implies that all routing requests have already been - // marked to block behind the next catalog cache refresh. We do not need to mark the shard - // as stale in this case. - return; - } else if (itColl->second->routingInfo->getVersion() == ccri._cm->getVersion()) { - // If the versions match, the last version of the routing information that we used is no - // longer valid, so trigger a refresh. - itColl->second->needsRefresh = true; - itColl->second->routingInfo->setShardStale(staleShardId); - } -} - void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx, bool shouldBlock) { if (gEnableFinerGrainedCatalogCacheRefresh) { @@ -887,9 +847,4 @@ std::string ComparableChunkVersion::toString() const { return toBSON().toString(); } -CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(NamespaceString nss, - CachedDatabaseInfo db, - boost::optional<ChunkManager> cm) - : _nss(std::move(nss)), _db(std::move(db)), _cm(std::move(cm)) {} - } // namespace mongo diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index ca06eb85e6b..a957189183a 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -29,17 +29,13 @@ #pragma once -#include <memory> - #include "mongo/base/string_data.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" -#include "mongo/s/database_version_gen.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/concurrency/with_lock.h" @@ -50,7 +46,6 @@ namespace mongo { class BSONObjBuilder; class CachedDatabaseInfo; -class CachedCollectionRoutingInfo; class OperationContext; static constexpr int kMaxNumStaleVersionRetries = 10; @@ -242,6 +237,7 @@ public: private: friend class CatalogCache; + CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard); DatabaseType _dbt; @@ -249,45 +245,6 @@ private: }; /** - * Constructed exclusively by the CatalogCache. - * - * This RoutingInfo can be considered a "package" of routing info for the database and for the - * collection. Once unsharded collections are treated as sharded collections with a single chunk, - * they will also have a ChunkManager with a "chunk distribution." At that point, this "package" can - * be dismantled: routing for commands that route by database can directly retrieve the - * CachedDatabaseInfo, while routing for commands that route by collection can directly retrieve the - * ChunkManager. - */ -class CachedCollectionRoutingInfo { -public: - CachedDatabaseInfo db() const { - return _db; - } - - const ChunkManager* cm() const { - return _cm.get_ptr(); - } - -private: - friend class CatalogCache; - friend class CachedDatabaseInfo; - - CachedCollectionRoutingInfo(NamespaceString nss, - CachedDatabaseInfo db, - boost::optional<ChunkManager> cm); - - NamespaceString _nss; - - // Copy of the database's cached info. - CachedDatabaseInfo _db; - - // Shared reference to the collection's cached chunk distribution if sharded, otherwise - // boost::none. This is a shared reference rather than a copy because the chunk distribution can - // be large. - boost::optional<ChunkManager> _cm; -}; - -/** * This is the root of the "read-only" hierarchy of cached catalog metadata. It is read only * in the sense that it only reads from the persistent store, but never writes to it. Instead * writes happen through the ShardingCatalogManager and the cache hierarchy needs to be invalidated. @@ -318,9 +275,9 @@ public: * If the given atClusterTime is so far in the past that it is not possible to construct routing * info, returns a StaleClusterTime error. */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoAt(OperationContext* opCtx, - const NamespaceString& nss, - Timestamp atClusterTime); + StatusWith<ChunkManager> getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + Timestamp atClusterTime); /** * Same as the getCollectionRoutingInfoAt call above, but returns the latest known routing @@ -330,8 +287,8 @@ public: * guaranteed to never return StaleClusterTime, because the latest routing information should * always be available. */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss); + StatusWith<ChunkManager> getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss); /** * Same as getDatbase above, but in addition forces the database entry to be refreshed. @@ -352,7 +309,7 @@ public: * collection version to decide when a refresh is necessary and provide * proper causal consistency */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoWithRefresh( + StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread = false); @@ -361,8 +318,8 @@ public: * Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a * NamespaceNotSharded error if the collection is not sharded. */ - StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, const NamespaceString& nss); + StatusWith<ChunkManager> getShardedCollectionRoutingInfoWithRefresh(OperationContext* opCtx, + const NamespaceString& nss); /** * Advances the version in the cache for the given database. @@ -376,14 +333,6 @@ public: const boost::optional<DatabaseVersion>& wantedVersion); /** - * Non-blocking method that marks the current cached collection entry as needing refresh if its - * collectionVersion matches the input's ChunkManager's collectionVersion. - * - * To be called if using the input routing info caused a StaleShardVersion to be received. - */ - void onStaleShardVersion(CachedCollectionRoutingInfo&&, const ShardId& staleShardId); - - /** * Gets whether this operation should block behind a catalog cache refresh. */ static bool getOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx); @@ -470,7 +419,6 @@ public: private: // Make the cache entries friends so they can access the private classes below friend class CachedDatabaseInfo; - friend class CachedCollectionRoutingInfo; /** * Cache entry describing a collection. @@ -573,7 +521,7 @@ private: */ struct RefreshResult { // Status containing result of refresh - StatusWith<CachedCollectionRoutingInfo> statusWithInfo; + StatusWith<ChunkManager> statusWithInfo; RefreshAction actionTaken; }; diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp index de1fb9e7388..27a62bd9749 100644 --- a/src/mongo/s/catalog_cache_refresh_test.cpp +++ b/src/mongo/s/catalog_cache_refresh_test.cpp @@ -117,11 +117,9 @@ TEST_F(CatalogCacheRefreshTest, FullLoad) { chunk4.toConfigBSON()}; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(4, cm->numChunks()); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(4, cm.numChunks()); } TEST_F(CatalogCacheRefreshTest, NoLoadIfShardNotMarkedStaleInOperationContext) { @@ -131,10 +129,9 @@ TEST_F(CatalogCacheRefreshTest, NoLoadIfShardNotMarkedStaleInOperationContext) { ASSERT_EQ(2, initialRoutingInfo.numChunks()); auto futureNoRefresh = scheduleRoutingInfoUnforcedRefresh(kNss); - auto routingInfo = futureNoRefresh.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - ASSERT_EQ(2, cm->numChunks()); + auto cm = *futureNoRefresh.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); } class MockLockerAlwaysReportsToBeLocked : public LockerNoop { @@ -160,12 +157,8 @@ TEST_F(CatalogCacheRefreshTest, DatabaseNotFound) { expectFindSendBSONObjVector(kConfigHostAndPort, {}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - - FAIL(str::stream() << "Returning no database did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + auto cm = *future.default_timed_get(); + FAIL(str::stream() << "Returning no database did not fail and returned " << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NamespaceNotFound, ex.code()); } @@ -181,12 +174,9 @@ TEST_F(CatalogCacheRefreshTest, DatabaseBSONCorrupted) { << "This value should not be in a database config document")}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning corrupted database entry did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code()); } @@ -200,10 +190,9 @@ TEST_F(CatalogCacheRefreshTest, CollectionNotFound) { // Return an empty collection expectFindSendBSONObjVector(kConfigHostAndPort, {}); - auto routingInfo = future.default_timed_get(); - ASSERT(!routingInfo->cm()); - ASSERT(routingInfo->db().primary()); - ASSERT_EQ(ShardId{"0"}, routingInfo->db().primaryId()); + auto cm = *future.default_timed_get(); + ASSERT(!cm.isSharded()); + ASSERT_EQ(ShardId{"0"}, cm.dbPrimary()); } TEST_F(CatalogCacheRefreshTest, CollectionBSONCorrupted) { @@ -218,12 +207,9 @@ TEST_F(CatalogCacheRefreshTest, CollectionBSONCorrupted) { << "This value should not be in a collection config document")}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning corrupted collection entry did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code()); } @@ -248,12 +234,9 @@ TEST_F(CatalogCacheRefreshTest, FullLoadNoChunksFound) { expectFindSendBSONObjVector(kConfigHostAndPort, {}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning no chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -280,12 +263,9 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadNoChunksFound) { expectFindSendBSONObjVector(kConfigHostAndPort, {}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning no chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -313,12 +293,9 @@ TEST_F(CatalogCacheRefreshTest, ChunksBSONCorrupted) { }()); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning no chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code()); } @@ -370,13 +347,10 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithLowestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -428,13 +402,10 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithHighestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -488,13 +459,10 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -547,13 +515,10 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -595,13 +560,10 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) { expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning chunks with different epoch for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -684,14 +646,12 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()}; }); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(3, cm->numChunks()); - ASSERT_EQ(newVersion, cm->getVersion()); - ASSERT_EQ(ChunkVersion(5, 1, newVersion.epoch()), cm->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(5, 2, newVersion.epoch()), cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(3, cm.numChunks()); + ASSERT_EQ(newVersion, cm.getVersion()); + ASSERT_EQ(ChunkVersion(5, 1, newVersion.epoch()), cm.getVersion({"0"})); + ASSERT_EQ(ChunkVersion(5, 2, newVersion.epoch()), cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { @@ -733,14 +693,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(2, cm->numChunks()); - ASSERT_EQ(newVersion, cm->getVersion()); - ASSERT_EQ(ChunkVersion(1, 0, newVersion.epoch()), cm->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(1, 1, newVersion.epoch()), cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); + ASSERT_EQ(newVersion, cm.getVersion()); + ASSERT_EQ(ChunkVersion(1, 0, newVersion.epoch()), cm.getVersion({"0"})); + ASSERT_EQ(ChunkVersion(1, 1, newVersion.epoch()), cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { @@ -778,14 +736,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(2, cm->numChunks()); - ASSERT_EQ(version, cm->getVersion()); - ASSERT_EQ(version, cm->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); + ASSERT_EQ(version, cm.getVersion()); + ASSERT_EQ(version, cm.getVersion({"0"})); + ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMove) { @@ -819,14 +775,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMove) { return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(2, cm->numChunks()); - ASSERT_EQ(version, cm->getVersion()); - ASSERT_EQ(version, cm->getVersion({"0"})); - ASSERT_EQ(expectedDestShardVersion, cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); + ASSERT_EQ(version, cm.getVersion()); + ASSERT_EQ(version, cm.getVersion({"0"})); + ASSERT_EQ(expectedDestShardVersion, cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunk) { @@ -856,14 +810,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunk) { return std::vector<BSONObj>{chunk1.toConfigBSON()}; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(1, cm->numChunks()); - ASSERT_EQ(version, cm->getVersion()); - ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm->getVersion({"0"})); - ASSERT_EQ(version, cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(1, cm.numChunks()); + ASSERT_EQ(version, cm.getVersion()); + ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm.getVersion({"0"})); + ASSERT_EQ(version, cm.getVersion({"1"})); } } // namespace diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index a3254db64ab..ba9bb4a4ddb 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -59,7 +59,7 @@ void CatalogCacheTestFixture::setUp() { CollatorFactoryInterface::set(getServiceContext(), std::make_unique<CollatorFactoryMock>()); } -executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> +executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> CatalogCacheTestFixture::scheduleRoutingInfoForcedRefresh(const NamespaceString& nss) { return launchAsync([this, nss] { auto client = getServiceContext()->makeClient("Test"); @@ -70,13 +70,13 @@ CatalogCacheTestFixture::scheduleRoutingInfoForcedRefresh(const NamespaceString& }); } -executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> +executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> CatalogCacheTestFixture::scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss) { return launchAsync([this, nss] { auto client = getServiceContext()->makeClient("Test"); auto const catalogCache = Grid::get(getServiceContext())->catalogCache(); - return boost::make_optional( + return boost::optional<ChunkManager>( uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss))); }); } @@ -160,11 +160,7 @@ ChunkManager CatalogCacheTestFixture::makeChunkManager( expectFindSendBSONObjVector(kConfigHostAndPort, {collectionBSON}); expectFindSendBSONObjVector(kConfigHostAndPort, initialChunks); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - ASSERT(routingInfo->db().primary()); - - return *routingInfo->cm(); + return *future.default_timed_get(); } void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss, std::string shardId) { @@ -188,13 +184,13 @@ void CatalogCacheTestFixture::expectGetCollection(NamespaceString nss, }()); } -CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards( +ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards( NamespaceString nss) { return loadRoutingTableWithTwoChunksAndTwoShardsImpl(nss, BSON("_id" << 1)); } -CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsHash( +ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsHash( NamespaceString nss) { return loadRoutingTableWithTwoChunksAndTwoShardsImpl(nss, @@ -202,7 +198,7 @@ CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChun << "hashed")); } -CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsImpl( +ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsImpl( NamespaceString nss, const BSONObj& shardKey) { const OID epoch = OID::gen(); const ShardKeyPattern shardKeyPattern(shardKey); @@ -230,7 +226,7 @@ CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChun return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }()); - return future.default_timed_get().get(); + return *future.default_timed_get(); } } // namespace mongo diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h index d17d81e6a93..412e2455911 100644 --- a/src/mongo/s/catalog_cache_test_fixture.h +++ b/src/mongo/s/catalog_cache_test_fixture.h @@ -39,11 +39,6 @@ namespace mongo { -class BSONObj; -class ChunkManager; -class CollatorInterface; -class ShardKeyPattern; - class CatalogCacheTestFixture : public ShardingTestFixture { protected: void setUp() override; @@ -70,7 +65,7 @@ protected: * std::future with the MSVC STL library, which requires the templated type to be default * constructible. */ - executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> + executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> scheduleRoutingInfoForcedRefresh(const NamespaceString& nss); /** @@ -84,7 +79,7 @@ protected: * std::future with the MSVC STL library, which requires the templated type to be default * constructible. */ - executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> + executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss); /** @@ -99,18 +94,18 @@ protected: * Triggers a refresh for the given namespace and mocks network calls to simulate loading * metadata with two chunks: [minKey, 0) and [0, maxKey) on two shards with ids: "0" and "1". */ - CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShards(NamespaceString nss); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShards(NamespaceString nss); /** * Same as the above method but the sharding key is hashed. */ - CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShardsHash(NamespaceString nss); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShardsHash(NamespaceString nss); /** * The common implementation for any shard key. */ - CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShardsImpl( - NamespaceString nss, const BSONObj& shardKey); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString nss, + const BSONObj& shardKey); /** * Mocks network responses for loading a sharded database and collection from the config server. diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 3107a05a7a0..a159b3f0ca0 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -338,7 +338,7 @@ Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj for (BSONElement elt : shardKey) { uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard due to collation of key " - << elt.fieldNameStringData() << " for namespace " << getns(), + << elt.fieldNameStringData() << " for namespace " << _rt->nss(), !CollationIndexKey::isCollatableType(elt.type())); } } @@ -347,7 +347,7 @@ Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard using key " << shardKey - << " for namespace " << getns(), + << " for namespace " << _rt->nss(), chunkInfo && chunkInfo->containsKey(shardKey)); return Chunk(*chunkInfo, _clusterTime); @@ -370,7 +370,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e const BSONObj& query, const BSONObj& collation, std::set<ShardId>* shardIds) const { - auto qr = std::make_unique<QueryRequest>(_rt->getns()); + auto qr = std::make_unique<QueryRequest>(_rt->nss()); qr->setFilter(query); if (auto uuid = getUUID()) @@ -645,6 +645,10 @@ IndexBounds ChunkManager::collapseQuerySolution(const QuerySolutionNode* node) { return bounds; } +std::string ChunkManager::toString() const { + return _rt ? _rt->toString() : "UNSHARDED"; +} + bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other, const ShardId& shardName) const { // Return true if the shard version is the same in the two chunk managers diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index a5863b064b2..4ac16f5fa18 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -38,6 +38,7 @@ #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" +#include "mongo/s/database_version_gen.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/ticketholder.h" @@ -46,15 +47,13 @@ namespace mongo { class CanonicalQuery; struct QuerySolutionNode; -class OperationContext; class ChunkManager; struct ShardVersionTargetingInfo { - // Indicates whether the shard is stale and thus needs a catalog cache refresh. Is false by - // default. - AtomicWord<bool> isStale; + // Indicates whether the shard is stale and thus needs a catalog cache refresh + AtomicWord<bool> isStale{false}; - // Max chunk version for the shard. + // Max chunk version for the shard ChunkVersion shardVersion; ShardVersionTargetingInfo(const OID& epoch); @@ -64,9 +63,11 @@ struct ShardVersionTargetingInfo { // shard is currently marked as needing a catalog cache refresh (stale). using ShardVersionMap = stdx::unordered_map<ShardId, ShardVersionTargetingInfo, ShardId::Hasher>; -// This class serves as a Facade around how the mapping of ranges to chunks is represented. It also -// provides a simpler, high-level interface for domain specific operations without exposing the -// underlying implementation. +/** + * This class serves as a Facade around how the mapping of ranges to chunks is represented. It also + * provides a simpler, high-level interface for domain specific operations without exposing the + * underlying implementation. + */ class ChunkMap { // Vector of chunks ordered by max key. using ChunkVector = std::vector<std::shared_ptr<ChunkInfo>>; @@ -168,7 +169,7 @@ public: */ std::shared_ptr<RoutingTableHistory> makeUpdated(const std::vector<ChunkType>& changedChunks); - const NamespaceString& getns() const { + const NamespaceString& nss() const { return _nss; } @@ -261,6 +262,8 @@ public: } private: + friend class ChunkManager; + RoutingTableHistory(NamespaceString nss, boost::optional<UUID> uuid, KeyPattern shardKeyPattern, @@ -294,8 +297,6 @@ private: // Note: this declaration must not be moved before _chunkMap since it is initialized by using // the _chunkMap instance. ShardVersionMap _shardVersions; - - friend class ChunkManager; }; /** @@ -303,13 +304,37 @@ private: */ class ChunkManager { public: - ChunkManager(std::shared_ptr<RoutingTableHistory> rt, boost::optional<Timestamp> clusterTime) - : _rt(std::move(rt)), _clusterTime(std::move(clusterTime)) {} + ChunkManager(ShardId dbPrimary, + DatabaseVersion dbVersion, + std::shared_ptr<RoutingTableHistory> rt, + boost::optional<Timestamp> clusterTime) + : _dbPrimary(std::move(dbPrimary)), + _dbVersion(std::move(dbVersion)), + _rt(std::move(rt)), + _clusterTime(std::move(clusterTime)) {} + + // Methods supported on both sharded and unsharded collections + + bool isSharded() const { + return bool(_rt); + } + + const ShardId& dbPrimary() const { + return _dbPrimary; + } + + const DatabaseVersion& dbVersion() const { + return _dbVersion; + } - const NamespaceString& getns() const { - return _rt->getns(); + int numChunks() const { + return _rt ? _rt->numChunks() : 1; } + std::string toString() const; + + // Methods only supported on sharded collections (caller must check isSharded()) + const ShardKeyPattern& getShardKeyPattern() const { return _rt->getShardKeyPattern(); } @@ -345,10 +370,6 @@ public: }); } - int numChunks() const { - return _rt->numChunks(); - } - /** * Returns true if a document with the given "shardKey" is owned by the shard with the given * "shardId" in this routing table. If "shardKey" is empty returns false. If "shardKey" is not a @@ -452,10 +473,6 @@ public: return _rt->compatibleWith(*other._rt, shard); } - std::string toString() const { - return _rt->toString(); - } - bool uuidMatches(UUID uuid) const { return _rt->uuidMatches(uuid); } @@ -469,7 +486,11 @@ public: } private: + ShardId _dbPrimary; + DatabaseVersion _dbVersion; + std::shared_ptr<RoutingTableHistory> _rt; + boost::optional<Timestamp> _clusterTime; }; diff --git a/src/mongo/s/chunk_manager_query_test.cpp b/src/mongo/s/chunk_manager_query_test.cpp index 0e40ed48676..823166d96e9 100644 --- a/src/mongo/s/chunk_manager_query_test.cpp +++ b/src/mongo/s/chunk_manager_query_test.cpp @@ -523,7 +523,8 @@ TEST_F(ChunkManagerQueryTest, SnapshotQueryWithMoreShardsThanLatestMetadata) { ChunkHistory(Timestamp(1, 0), ShardId("1"))}); auto newRoutingTable = oldRoutingTable->makeUpdated({chunk1}); - ChunkManager chunkManager(newRoutingTable, Timestamp(5, 0)); + ChunkManager chunkManager( + ShardId("0"), DatabaseVersion(UUID::gen(), 1), newRoutingTable, Timestamp(5, 0)); std::set<ShardId> shardIds; chunkManager.getShardIdsForRange(BSON("x" << MINKEY), BSON("x" << MAXKEY), &shardIds); diff --git a/src/mongo/s/chunk_manager_refresh_bm.cpp b/src/mongo/s/chunk_manager_refresh_bm.cpp index a5bc812c78a..41a33a964b6 100644 --- a/src/mongo/s/chunk_manager_refresh_bm.cpp +++ b/src/mongo/s/chunk_manager_refresh_bm.cpp @@ -41,6 +41,8 @@ namespace mongo { namespace { +const NamespaceString kNss("test", "foo"); + ChunkRange getRangeForChunk(int i, int nChunks) { invariant(i >= 0); invariant(nChunks > 0); @@ -55,24 +57,27 @@ ChunkRange getRangeForChunk(int i, int nChunks) { } template <typename ShardSelectorFn> -auto makeChunkManagerWithShardSelector(int nShards, uint32_t nChunks, ShardSelectorFn selectShard) { +CollectionMetadata makeChunkManagerWithShardSelector(int nShards, + uint32_t nChunks, + ShardSelectorFn selectShard) { const auto collEpoch = OID::gen(); - const auto collName = NamespaceString("test.foo"); const auto shardKeyPattern = KeyPattern(BSON("_id" << 1)); std::vector<ChunkType> chunks; chunks.reserve(nChunks); for (uint32_t i = 0; i < nChunks; ++i) { - chunks.emplace_back(collName, + chunks.emplace_back(kNss, getRangeForChunk(i, nChunks), ChunkVersion{i + 1, 0, collEpoch}, selectShard(i, nShards, nChunks)); } auto routingTableHistory = RoutingTableHistory::makeNew( - collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, chunks); - return std::make_unique<CollectionMetadata>(ChunkManager(routingTableHistory, boost::none), - ShardId("shard0")); + kNss, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, chunks); + return CollectionMetadata( + ChunkManager( + ShardId("Shard0"), DatabaseVersion(UUID::gen(), 1), routingTableHistory, boost::none), + ShardId("shard0")); } ShardId pessimalShardSelector(int i, int nShards, int nChunks) { @@ -98,22 +103,22 @@ MONGO_COMPILER_NOINLINE auto makeChunkManagerWithOptimalBalancedDistribution(int MONGO_COMPILER_NOINLINE auto runIncrementalUpdate(const CollectionMetadata& cm, const std::vector<ChunkType>& newChunks) { auto rt = cm.getChunkManager()->getRoutingHistory()->makeUpdated(newChunks); - return std::make_unique<CollectionMetadata>(ChunkManager(rt, boost::none), ShardId("shard0")); + return std::make_unique<CollectionMetadata>( + ChunkManager(ShardId("shard0"), DatabaseVersion(UUID::gen(), 1), rt, boost::none), + ShardId("shard0")); } void BM_IncrementalRefreshWithNoChange(benchmark::State& state) { const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeChunkManagerWithOptimalBalancedDistribution(nShards, nChunks); + auto metadata = makeChunkManagerWithOptimalBalancedDistribution(nShards, nChunks); - auto postMoveVersion = cm->getChunkManager()->getVersion(); - const auto collName = NamespaceString(cm->getChunkManager()->getns()); + auto postMoveVersion = metadata.getChunkManager()->getVersion(); std::vector<ChunkType> newChunks; - newChunks.emplace_back( - collName, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); + newChunks.emplace_back(kNss, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); for (auto keepRunning : state) { - benchmark::DoNotOptimize(runIncrementalUpdate(*cm, newChunks)); + benchmark::DoNotOptimize(runIncrementalUpdate(metadata, newChunks)); } } @@ -125,20 +130,17 @@ BENCHMARK(BM_IncrementalRefreshWithNoChange) void BM_IncrementalRefreshOfPessimalBalancedDistribution(benchmark::State& state) { const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeChunkManagerWithPessimalBalancedDistribution(nShards, nChunks); + auto metadata = makeChunkManagerWithPessimalBalancedDistribution(nShards, nChunks); - auto postMoveVersion = cm->getChunkManager()->getVersion(); - const auto collName = NamespaceString(cm->getChunkManager()->getns()); + auto postMoveVersion = metadata.getChunkManager()->getVersion(); std::vector<ChunkType> newChunks; postMoveVersion.incMajor(); - newChunks.emplace_back( - collName, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); + newChunks.emplace_back(kNss, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); postMoveVersion.incMajor(); - newChunks.emplace_back( - collName, getRangeForChunk(3, nChunks), postMoveVersion, ShardId("shard1")); + newChunks.emplace_back(kNss, getRangeForChunk(3, nChunks), postMoveVersion, ShardId("shard1")); for (auto keepRunning : state) { - benchmark::DoNotOptimize(runIncrementalUpdate(*cm, newChunks)); + benchmark::DoNotOptimize(runIncrementalUpdate(metadata, newChunks)); } } @@ -168,8 +170,11 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS for (auto keepRunning : state) { auto routingTableHistory = RoutingTableHistory::makeNew( collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, chunks); - benchmark::DoNotOptimize( - CollectionMetadata(ChunkManager(routingTableHistory, boost::none), ShardId("shard0"))); + benchmark::DoNotOptimize(CollectionMetadata(ChunkManager(ShardId("shard0"), + DatabaseVersion(UUID::gen(), 1), + routingTableHistory, + boost::none), + ShardId("shard0"))); } } @@ -257,13 +262,13 @@ void BM_FindIntersectingChunk(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto keysIter = makeCircularIterator(keys); for (auto keepRunning : state) { benchmark::DoNotOptimize( - cm->getChunkManager()->findIntersectingChunkWithSimpleCollation(*keysIter)); + metadata.getChunkManager()->findIntersectingChunkWithSimpleCollation(*keysIter)); ++keysIter; } @@ -276,14 +281,14 @@ void BM_GetShardIdsForRange(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto ranges = makeRanges(keys); auto rangesIter = makeCircularIterator(ranges); for (auto keepRunning : state) { std::set<ShardId> shardIds; - cm->getChunkManager()->getShardIdsForRange( + metadata.getChunkManager()->getShardIdsForRange( rangesIter->first, rangesIter->second, &shardIds); ++rangesIter; } @@ -297,13 +302,13 @@ void BM_GetShardIdsForRangeMinKeyToMaxKey(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto min = BSON("_id" << MINKEY); auto max = BSON("_id" << MAXKEY); for (auto keepRunning : state) { std::set<ShardId> shardIds; - cm->getChunkManager()->getShardIdsForRange(min, max, &shardIds); + metadata.getChunkManager()->getShardIdsForRange(min, max, &shardIds); } state.SetItemsProcessed(state.iterations()); @@ -315,14 +320,14 @@ void BM_KeyBelongsToMe(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto keysIter = makeCircularIterator(keys); size_t nOwned = 0; for (auto keepRunning : state) { - if (cm->keyBelongsToMe(*keysIter)) { + if (metadata.keyBelongsToMe(*keysIter)) { ++nOwned; } ++keysIter; @@ -338,7 +343,7 @@ void BM_RangeOverlapsChunk(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto ranges = makeRanges(keys); auto rangesIter = makeCircularIterator(ranges); @@ -346,7 +351,7 @@ void BM_RangeOverlapsChunk(benchmark::State& state, size_t nOverlapped = 0; for (auto keepRunning : state) { - if (cm->rangeOverlapsChunk(ChunkRange(rangesIter->first, rangesIter->second))) { + if (metadata.rangeOverlapsChunk(ChunkRange(rangesIter->first, rangesIter->second))) { ++nOverlapped; } ++rangesIter; diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index a282a1c548e..ac9c9900281 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -376,7 +376,7 @@ BSONObj stripWriteConcern(const BSONObj& cmdObj) { std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const BSONObj& query, @@ -384,24 +384,24 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard auto cmdToSend = cmdObj; - if (!routingInfo.cm()) { + if (!cm.isSharded()) { // The collection is unsharded. Target only the primary shard for the database. - const auto primaryShardId = routingInfo.db().primaryId(); + const auto primaryShardId = cm.dbPrimary(); if (shardsToSkip.find(primaryShardId) != shardsToSkip.end()) { return {}; } // Attach shardVersion "UNSHARDED", unless targeting the config server. - const auto cmdObjWithShardVersion = (primaryShardId != "config") + const auto cmdObjWithShardVersion = (primaryShardId != ShardRegistry::kConfigServerShardId) ? appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED()) : cmdToSend; return buildUnshardedRequestsForAllShards( opCtx, {primaryShardId}, - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo.db())); + appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())); } std::vector<AsyncRequestsSender::Request> requests; @@ -415,12 +415,11 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard } auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(collator), nss); - routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); for (const ShardId& shardId : shardIds) { if (shardsToSkip.find(shardId) == shardsToSkip.end()) { - requests.emplace_back( - shardId, appendShardVersion(cmdToSend, routingInfo.cm()->getVersion(shardId))); + requests.emplace_back(shardId, appendShardVersion(cmdToSend, cm.getVersion(shardId))); } } @@ -441,14 +440,14 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, const BSONObj& query, const BSONObj& collation) { const auto requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, {} /* shardsToSkip */, cmdObj, query, collation); + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, query, collation); return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests); } @@ -458,7 +457,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, @@ -466,7 +465,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( const BSONObj& query, const BSONObj& collation) { const auto requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, shardsToSkip, cmdObj, query, collation); + opCtx, nss, cm, shardsToSkip, cmdObj, query, collation); return gatherResponsesNoThrowOnStaleShardVersionErrors( opCtx, dbName, readPref, retryPolicy, requests); @@ -479,18 +478,18 @@ std::vector<AsyncRequestsSender::Response> scatterGatherOnlyVersionIfUnsharded( const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, const std::set<ErrorCodes::Error>& ignorableErrors) { - auto routingInfo = + auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); std::vector<AsyncRequestsSender::Request> requests; - if (routingInfo.cm()) { + if (cm.isSharded()) { // An unversioned request on a sharded collection can cause a shard that has not owned data // for the collection yet to implicitly create the collection without all the collection // options. So, we signal to shards that they should not implicitly create the collection. requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj); } else { requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, {} /* shardsToSkip */, cmdObj, BSONObj(), BSONObj()); + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, BSONObj(), BSONObj()); } return gatherResponses(opCtx, nss.db(), readPref, retryPolicy, requests); @@ -528,27 +527,21 @@ AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy) { - const auto query = routingInfo.cm() - ? routingInfo.cm()->getShardKeyPattern().getKeyPattern().globalMin() - : BSONObj(); + const auto query = + cm.isSharded() ? cm.getShardKeyPattern().getKeyPattern().globalMin() : BSONObj(); - auto responses = - gatherResponses(opCtx, - nss.db(), - readPref, - retryPolicy, - buildVersionedRequestsForTargetedShards(opCtx, - nss, - routingInfo, - {} /* shardsToSkip */, - cmdObj, - query, - BSONObj() /* collation */)); + auto responses = gatherResponses( + opCtx, + nss.db(), + readPref, + retryPolicy, + buildVersionedRequestsForTargetedShards( + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, query, BSONObj() /* collation */)); return std::move(responses.front()); } @@ -749,31 +742,31 @@ void createShardDatabase(OperationContext* opCtx, StringData dbName) { } std::set<ShardId> getTargetedShardsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& query, const BSONObj& collation) { - if (routingInfo.cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation. + if (cm.isSharded()) { + // The collection is sharded. Use the routing table to decide which shards to target based + // on the query and collation. std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); return shardIds; } // The collection is unsharded. Target only the primary shard for the database. - return {routingInfo.db().primaryId()}; + return {cm.dbPrimary()}; } std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const BSONObj& query, const BSONObj& collation) { std::vector<std::pair<ShardId, BSONObj>> requests; auto ars_requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, {} /* shardsToSkip */, cmdObj, query, collation); + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, query, collation); std::transform(std::make_move_iterator(ars_requests.begin()), std::make_move_iterator(ars_requests.end()), std::back_inserter(requests), @@ -784,8 +777,8 @@ std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( return requests; } -StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( - OperationContext* opCtx, const NamespaceString& nss) { +StatusWith<ChunkManager> getCollectionRoutingInfoForTxnCmd(OperationContext* opCtx, + const NamespaceString& nss) { auto catalogCache = Grid::get(opCtx)->catalogCache(); invariant(catalogCache); @@ -808,29 +801,29 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( StatusWith<Shard::QueryResponse> loadIndexesFromAuthoritativeShard(OperationContext* opCtx, const NamespaceString& nss) { - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); auto [indexShard, listIndexesCmd] = [&]() -> std::pair<std::shared_ptr<Shard>, BSONObj> { auto cmdNoVersion = applyReadWriteConcern( opCtx, true /* appendRC */, false /* appendWC */, BSON("listIndexes" << nss.coll())); - if (routingInfo.cm()) { + if (cm.isSharded()) { // For a sharded collection we must load indexes from a shard with chunks. For // consistency with cluster listIndexes, load from the shard that owns the minKey chunk. - const auto minKeyShardId = routingInfo.cm()->getMinKeyShardIdWithSimpleCollation(); - auto minKeyShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId)); - return {minKeyShard, - appendShardVersion(cmdNoVersion, routingInfo.cm()->getVersion(minKeyShardId))}; + const auto minKeyShardId = cm.getMinKeyShardIdWithSimpleCollation(); + return { + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId)), + appendShardVersion(cmdNoVersion, cm.getVersion(minKeyShardId))}; } else { // For an unsharded collection, the primary shard will have correct indexes. We attach // unsharded shard version to detect if the collection has become sharded. const auto cmdObjWithShardVersion = - (routingInfo.db().primaryId() != ShardRegistry::kConfigServerShardId) + (cm.dbPrimary() != ShardRegistry::kConfigServerShardId) ? appendShardVersion(cmdNoVersion, ChunkVersion::UNSHARDED()) : cmdNoVersion; - return {routingInfo.db().primary(), - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo.db())}; + return { + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())), + appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}; } }(); diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 243e0d5b7d7..11985ac9682 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -87,7 +87,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTarg std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const BSONObj& query, @@ -192,7 +192,7 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, @@ -215,7 +215,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, @@ -275,7 +275,7 @@ AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy); @@ -336,7 +336,7 @@ void createShardDatabase(OperationContext* opCtx, StringData dbName); * info. */ std::set<ShardId> getTargetedShardsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& query, const BSONObj& collation); @@ -347,7 +347,7 @@ std::set<ShardId> getTargetedShardsForQuery(boost::intrusive_ptr<ExpressionConte std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const BSONObj& query, const BSONObj& collation); @@ -360,8 +360,8 @@ std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( * * Should be used by all router commands that can be run in a transaction when targeting shards. */ -StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( - OperationContext* opCtx, const NamespaceString& nss); +StatusWith<ChunkManager> getCollectionRoutingInfoForTxnCmd(OperationContext* opCtx, + const NamespaceString& nss); /** * Loads all of the indexes for the given namespace from the appropriate shard. For unsharded diff --git a/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp b/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp index 9fd74b44b71..0750a0b4b47 100644 --- a/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp +++ b/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp @@ -73,10 +73,9 @@ public: } void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, ns())); - const auto cm = routingInfo.cm(); uassert(ErrorCodes::InvalidOptions, "bounds can only have exactly 2 elements", @@ -93,21 +92,20 @@ public: boost::optional<Chunk> chunk; if (request().getFind()) { - BSONObj shardKey = - uassertStatusOK(cm->getShardKeyPattern().extractShardKeyFromQuery( - opCtx, ns(), *request().getFind())); + BSONObj shardKey = uassertStatusOK(cm.getShardKeyPattern().extractShardKeyFromQuery( + opCtx, ns(), *request().getFind())); uassert(51260, str::stream() << "no shard key found in chunk query " << *request().getFind(), !shardKey.isEmpty()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(shardKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(shardKey)); } else { auto boundsArray = *request().getBounds(); - BSONObj minKey = cm->getShardKeyPattern().normalizeShardKey(boundsArray.front()); - BSONObj maxKey = cm->getShardKeyPattern().normalizeShardKey(boundsArray.back()); + BSONObj minKey = cm.getShardKeyPattern().normalizeShardKey(boundsArray.front()); + BSONObj maxKey = cm.getShardKeyPattern().normalizeShardKey(boundsArray.back()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(minKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(minKey)); uassert(51261, str::stream() << "no chunk found with the shard key bounds " @@ -117,7 +115,7 @@ public: } ConfigsvrClearJumboFlag configCmd( - ns(), cm->getVersion().epoch(), chunk->getMin(), chunk->getMax()); + ns(), cm.getVersion().epoch(), chunk->getMin(), chunk->getMax()); configCmd.setDbName(request().getDbName()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); diff --git a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp index c493f2a72f5..f65f88c119a 100644 --- a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp @@ -111,13 +111,13 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (routingInfo.cm()) { + if (cm.isSharded()) { result.appendBool("sharded", true); } else { result.appendBool("sharded", false); - result.append("primary", routingInfo.db().primaryId().toString()); + result.append("primary", cm.dbPrimary().toString()); } int scale = 1; @@ -138,7 +138,7 @@ public: opCtx, nss.db(), nss, - routingInfo, + cm, applyReadWriteConcern( opCtx, this, @@ -261,7 +261,7 @@ public: result.append("maxSize", maxSize / scale); result.append("nindexes", nindexes); result.append("scaleFactor", scale); - result.append("nchunks", (routingInfo.cm() ? routingInfo.cm()->numChunks() : 1)); + result.append("nchunks", cm.numChunks()); result.append("shards", shardStats.obj()); return true; diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 7359c79e910..9f836644ff5 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -176,10 +176,10 @@ public: CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); } - const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + const auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); if (repl::ReadConcernArgs::get(opCtx).getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && - !opCtx->inMultiDocumentTransaction() && routingInfo.cm()) { + !opCtx->inMultiDocumentTransaction() && cm.isSharded()) { uasserted(ErrorCodes::InvalidOptions, "readConcern level \"snapshot\" prohibited for \"distinct\" command on" " sharded collection"); @@ -191,7 +191,7 @@ public: opCtx, nss.db(), nss, - routingInfo, + cm, applyReadWriteConcern( opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), @@ -226,12 +226,11 @@ public: return true; } - BSONObjComparator bsonCmp( - BSONObj(), - BSONObjComparator::FieldNamesMode::kConsider, - !collation.isEmpty() - ? collator.get() - : (routingInfo.cm() ? routingInfo.cm()->getDefaultCollator() : nullptr)); + BSONObjComparator bsonCmp(BSONObj(), + BSONObjComparator::FieldNamesMode::kConsider, + !collation.isEmpty() + ? collator.get() + : (cm.isSharded() ? cm.getDefaultCollator() : nullptr)); BSONObjSet all = bsonCmp.makeBSONObjSet(); for (const auto& response : shardResponses) { diff --git a/src/mongo/s/commands/cluster_filemd5_cmd.cpp b/src/mongo/s/commands/cluster_filemd5_cmd.cpp index 734695db71b..d351a7be7a3 100644 --- a/src/mongo/s/commands/cluster_filemd5_cmd.cpp +++ b/src/mongo/s/commands/cluster_filemd5_cmd.cpp @@ -85,16 +85,15 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - const auto callShardFn = [opCtx, &nss, &routingInfo](const BSONObj& cmdObj, - const BSONObj& routingQuery) { + const auto callShardFn = [&](const BSONObj& cmdObj, const BSONObj& routingQuery) { auto shardResults = scatterGatherVersionedTargetByRoutingTable(opCtx, nss.db(), nss, - routingInfo, + cm, cmdObj, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, @@ -113,9 +112,9 @@ public: // If the collection is not sharded, or is sharded only on the 'files_id' field, we only // need to target a single shard, because the files' chunks can only be contained in a // single sharded chunk - if (!routingInfo.cm() || - SimpleBSONObjComparator::kInstance.evaluate( - routingInfo.cm()->getShardKeyPattern().toBSON() == BSON("files_id" << 1))) { + if (!cm.isSharded() || + SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == + BSON("files_id" << 1))) { CommandHelpers::filterCommandReplyForPassthrough( callShardFn( applyReadWriteConcern( @@ -130,9 +129,8 @@ public: uassert(ErrorCodes::IllegalOperation, "The GridFS fs.chunks collection must be sharded on either {files_id:1} or " "{files_id:1, n:1}", - SimpleBSONObjComparator::kInstance.evaluate( - routingInfo.cm()->getShardKeyPattern().toBSON() == - BSON("files_id" << 1 << "n" << 1))); + SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == + BSON("files_id" << 1 << "n" << 1))); // Theory of operation: // diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 03d32d8edda..0dba2c8ba40 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -208,27 +208,25 @@ public: const BSONObj& cmdObj = request.body; const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - boost::optional<ChunkManager> chunkMgr; std::shared_ptr<Shard> shard; - if (!routingInfo.cm()) { - shard = routingInfo.db().primary(); - } else { - chunkMgr.emplace(*routingInfo.cm()); - + if (cm.isSharded()) { const BSONObj query = cmdObj.getObjectField("query"); const BSONObj collation = getCollation(cmdObj); const auto let = getLet(cmdObj); const auto rc = getRuntimeConstants(cmdObj); const BSONObj shardKey = - getShardKey(opCtx, *chunkMgr, nss, query, collation, verbosity, let, rc); - const auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); + getShardKey(opCtx, cm, nss, query, collation, verbosity, let, rc); + const auto chunk = cm.findIntersectingChunk(shardKey, collation); shard = uassertStatusOK( Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk.getShardId())); + } else { + shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())); } const auto explainCmd = ClusterExplain::wrapAsExplain( @@ -238,10 +236,10 @@ public: Timer timer; BSONObjBuilder bob; - if (chunkMgr) { + if (cm.isSharded()) { _runCommand(opCtx, shard->getId(), - chunkMgr->getVersion(shard->getId()), + cm.getVersion(shard->getId()), boost::none, nss, applyReadWriteConcern(opCtx, false, false, explainCmd), @@ -250,7 +248,7 @@ public: _runCommand(opCtx, shard->getId(), ChunkVersion::UNSHARDED(), - routingInfo.db().databaseVersion(), + cm.dbVersion(), nss, applyReadWriteConcern(opCtx, false, false, explainCmd), &bob); @@ -286,31 +284,30 @@ public: // Append mongoS' runtime constants to the command object before forwarding it to the shard. auto cmdObjForShard = appendRuntimeConstantsToCommandObject(opCtx, cmdObj); - const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - if (!routingInfo.cm()) { + const auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + if (!cm.isSharded()) { _runCommand(opCtx, - routingInfo.db().primaryId(), + cm.dbPrimary(), ChunkVersion::UNSHARDED(), - routingInfo.db().databaseVersion(), + cm.dbVersion(), nss, applyReadWriteConcern(opCtx, this, cmdObjForShard), &result); return true; } - const auto chunkMgr = routingInfo.cm(); - const BSONObj query = cmdObjForShard.getObjectField("query"); const BSONObj collation = getCollation(cmdObjForShard); const auto let = getLet(cmdObjForShard); const auto rc = getRuntimeConstants(cmdObjForShard); const BSONObj shardKey = - getShardKey(opCtx, *chunkMgr, nss, query, collation, boost::none, let, rc); - auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); + getShardKey(opCtx, cm, nss, query, collation, boost::none, let, rc); + + auto chunk = cm.findIntersectingChunk(shardKey, collation); _runCommand(opCtx, chunk.getShardId(), - chunkMgr->getVersion(chunk.getShardId()), + cm.getVersion(chunk.getShardId()), boost::none, nss, applyReadWriteConcern(opCtx, this, cmdObjForShard), diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp index 2d7cf7fdda8..e83722c71e4 100644 --- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp @@ -101,13 +101,11 @@ public: result.append("version", cachedDbInfo.databaseVersion().toBSON()); } else { // Return the collection's information. - auto cachedCollInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + const auto cm = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << nss.ns() << " is not sharded.", - cachedCollInfo.cm()); - const auto cm = cachedCollInfo.cm(); - cm->getVersion().appendLegacyWithField(&result, "version"); + cm.isSharded()); + cm.getVersion().appendLegacyWithField(&result, "version"); if (cmdObj["fullMetadata"].trueValue()) { BSONArrayBuilder chunksArrBuilder; @@ -116,9 +114,9 @@ public: LOGV2(22753, "Routing info requested by getShardVersion: {routingInfo}", "Routing info requested by getShardVersion", - "routingInfo"_attr = redact(cm->toString())); + "routingInfo"_attr = redact(cm.toString())); - cm->forEachChunk([&](const auto& chunk) { + cm.forEachChunk([&](const auto& chunk) { if (!exceedsSizeLimit) { BSONArrayBuilder chunkBB(chunksArrBuilder.subarrayStart()); chunkBB.append(chunk.getMin()); diff --git a/src/mongo/s/commands/cluster_list_indexes_cmd.cpp b/src/mongo/s/commands/cluster_list_indexes_cmd.cpp index 8a4d93a3297..79bc042a0d7 100644 --- a/src/mongo/s/commands/cluster_list_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_indexes_cmd.cpp @@ -41,14 +41,14 @@ namespace { bool cursorCommandPassthroughShardWithMinKeyChunk(OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, BSONObjBuilder* out, const PrivilegeVector& privileges) { auto response = executeCommandAgainstShardWithMinKeyChunk( opCtx, nss, - routingInfo, + cm, CommandHelpers::filterCommandRequestForPassthrough(cmdObj), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); @@ -121,13 +121,13 @@ public: CommandHelpers::handleMarkKillOnClientDisconnect(opCtx); const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); return cursorCommandPassthroughShardWithMinKeyChunk( opCtx, nss, - routingInfo, + cm, applyReadWriteConcern(opCtx, this, cmdObj), &result, {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)}); diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 7908cdec77b..c6a166b942c 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -58,12 +58,12 @@ namespace { auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr, - boost::optional<CachedCollectionRoutingInfo> routingInfo, + const ChunkManager& cm, boost::optional<ExplainOptions::Verbosity> verbosity) { // Populate the collection UUID and the appropriate collation to use. auto nss = parsedMr.getNamespace(); auto [collationObj, uuid] = cluster_aggregation_planner::getCollationAndUUID( - routingInfo, nss, parsedMr.getCollation().get_value_or(BSONObj())); + opCtx, cm, nss, parsedMr.getCollation().get_value_or(BSONObj())); std::unique_ptr<CollatorInterface> resolvedCollator; if (!collationObj.isEmpty()) { @@ -154,9 +154,9 @@ bool runAggregationMapReduce(OperationContext* opCtx, involvedNamespaces.insert(resolvedOutNss); } - auto routingInfo = uassertStatusOK( + auto cm = uassertStatusOK( sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace())); - auto expCtx = makeExpressionContext(opCtx, parsedMr, routingInfo, verbosity); + auto expCtx = makeExpressionContext(opCtx, parsedMr, cm, verbosity); const auto pipelineBuilder = [&]() { return map_reduce_common::translateFromMR(parsedMr, expCtx); @@ -176,7 +176,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, cluster_aggregation_planner::AggregationTargeter::make(opCtx, parsedMr.getNamespace(), pipelineBuilder, - routingInfo, + cm, involvedNamespaces, false, // hasChangeStream true); // allowedToPassthrough @@ -187,14 +187,14 @@ bool runAggregationMapReduce(OperationContext* opCtx, // needed in the normal aggregation path. For this translation, though, we need to // build the pipeline to serialize and send to the primary shard. auto serialized = serializeToCommand(cmd, parsedMr, pipelineBuilder().get()); - uassertStatusOK(cluster_aggregation_planner::runPipelineOnPrimaryShard( - expCtx, - namespaces, - targeter.routingInfo->db(), - verbosity, - std::move(serialized), - privileges, - &tempResults)); + uassertStatusOK( + cluster_aggregation_planner::runPipelineOnPrimaryShard(expCtx, + namespaces, + *targeter.cm, + verbosity, + std::move(serialized), + privileges, + &tempResults)); break; } diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 26b9435f91a..b4157bee9d9 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -103,10 +103,9 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); vector<BSONObj> bounds; if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) { @@ -136,19 +135,19 @@ public: return false; } - if (!cm->getShardKeyPattern().isShardKey(minKey) || - !cm->getShardKeyPattern().isShardKey(maxKey)) { + if (!cm.getShardKeyPattern().isShardKey(minKey) || + !cm.getShardKeyPattern().isShardKey(maxKey)) { errmsg = str::stream() << "shard key bounds " << "[" << minKey << "," << maxKey << ")" - << " are not valid for shard key pattern " << cm->getShardKeyPattern().toBSON(); + << " are not valid for shard key pattern " << cm.getShardKeyPattern().toBSON(); return false; } - minKey = cm->getShardKeyPattern().normalizeShardKey(minKey); - maxKey = cm->getShardKeyPattern().normalizeShardKey(maxKey); + minKey = cm.getShardKeyPattern().normalizeShardKey(minKey); + maxKey = cm.getShardKeyPattern().normalizeShardKey(maxKey); - const auto firstChunk = cm->findIntersectingChunkWithSimpleCollation(minKey); + const auto firstChunk = cm.findIntersectingChunkWithSimpleCollation(minKey); BSONObjBuilder remoteCmdObjB; remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::nsField()]); @@ -158,7 +157,7 @@ public: Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString().toString()); remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk.getShardId().toString()); - remoteCmdObjB.append("epoch", cm->getVersion().epoch()); + remoteCmdObjB.append("epoch", cm.getVersion().epoch()); BSONObj remoteResult; @@ -175,10 +174,10 @@ public: Shard::RetryPolicy::kNotIdempotent)); uassertStatusOK(response.commandStatus); - Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(routingInfo), - firstChunk.getShardId()); - CommandHelpers::filterCommandReplyForPassthrough(response.response, &result); + Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection( + nss, firstChunk.getShardId()); + CommandHelpers::filterCommandReplyForPassthrough(response.response, &result); return true; } diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 398d8fc49c4..01cdb91234e 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -99,10 +99,9 @@ public: const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); const auto toElt = cmdObj["to"]; uassert(ErrorCodes::TypeMismatch, @@ -149,29 +148,29 @@ public: if (!find.isEmpty()) { // find - BSONObj shardKey = uassertStatusOK( - cm->getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); + BSONObj shardKey = + uassertStatusOK(cm.getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); if (shardKey.isEmpty()) { errmsg = str::stream() << "no shard key found in chunk query " << find; return false; } - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(shardKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(shardKey)); } else { // bounds - if (!cm->getShardKeyPattern().isShardKey(bounds[0].Obj()) || - !cm->getShardKeyPattern().isShardKey(bounds[1].Obj())) { + if (!cm.getShardKeyPattern().isShardKey(bounds[0].Obj()) || + !cm.getShardKeyPattern().isShardKey(bounds[1].Obj())) { errmsg = str::stream() << "shard key bounds " << "[" << bounds[0].Obj() << "," << bounds[1].Obj() << ")" - << " are not valid for shard key pattern " << cm->getShardKeyPattern().toBSON(); + << " are not valid for shard key pattern " << cm.getShardKeyPattern().toBSON(); return false; } - BSONObj minKey = cm->getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); - BSONObj maxKey = cm->getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); + BSONObj minKey = cm.getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); + BSONObj maxKey = cm.getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(minKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(minKey)); if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) { errmsg = str::stream() << "no chunk found with the shard key bounds " @@ -188,7 +187,7 @@ public: chunkType.setMin(chunk->getMin()); chunkType.setMax(chunk->getMax()); chunkType.setShard(chunk->getShardId()); - chunkType.setVersion(cm->getVersion()); + chunkType.setVersion(cm.getVersion()); uassertStatusOK(configsvr_client::moveChunk(opCtx, chunkType, diff --git a/src/mongo/s/commands/cluster_passthrough_commands.cpp b/src/mongo/s/commands/cluster_passthrough_commands.cpp index ccb5ee5799d..1919b0e87ae 100644 --- a/src/mongo/s/commands/cluster_passthrough_commands.cpp +++ b/src/mongo/s/commands/cluster_passthrough_commands.cpp @@ -49,24 +49,17 @@ namespace { bool nonShardedCollectionCommandPassthrough(OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, Shard::RetryPolicy retryPolicy, BSONObjBuilder* out) { const StringData cmdName(cmdObj.firstElementFieldName()); uassert(ErrorCodes::IllegalOperation, str::stream() << "Can't do command: " << cmdName << " on a sharded collection", - !routingInfo.cm()); - - auto responses = scatterGatherVersionedTargetByRoutingTable(opCtx, - dbName, - nss, - routingInfo, - cmdObj, - ReadPreferenceSetting::get(opCtx), - retryPolicy, - {}, - {}); + !cm.isSharded()); + + auto responses = scatterGatherVersionedTargetByRoutingTable( + opCtx, dbName, nss, cm, cmdObj, ReadPreferenceSetting::get(opCtx), retryPolicy, {}, {}); invariant(responses.size() == 1); const auto cmdResponse = uassertStatusOK(std::move(responses.front().swResponse)); @@ -119,23 +112,23 @@ public: str::stream() << "Invalid target namespace: " << toNss.ns(), toNss.isValid()); - const auto fromRoutingInfo = uassertStatusOK( + const auto fromCM = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fromNss)); - uassert(13138, "You can't rename a sharded collection", !fromRoutingInfo.cm()); + uassert(13138, "You can't rename a sharded collection", !fromCM.isSharded()); - const auto toRoutingInfo = uassertStatusOK( + const auto toCM = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, toNss)); - uassert(13139, "You can't rename to a sharded collection", !toRoutingInfo.cm()); + uassert(13139, "You can't rename to a sharded collection", !toCM.isSharded()); uassert(13137, "Source and destination collections must be on same shard", - fromRoutingInfo.db().primaryId() == toRoutingInfo.db().primaryId()); + fromCM.dbPrimary() == toCM.dbPrimary()); return nonShardedCollectionCommandPassthrough( opCtx, NamespaceString::kAdminDb, fromNss, - fromRoutingInfo, + fromCM, applyReadWriteConcern( opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), Shard::RetryPolicy::kNoRetry, @@ -173,11 +166,11 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::IllegalOperation, "You can't convertToCapped a sharded collection", - !routingInfo.cm()); + !cm.isSharded()); // convertToCapped creates a temp collection and renames it at the end. It will require // special handling for create collection. @@ -185,7 +178,7 @@ public: opCtx, dbName, nss, - routingInfo, + cm, applyReadWriteConcern( opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), Shard::RetryPolicy::kIdempotent, @@ -230,13 +223,11 @@ public: "Performing splitVector across dbs isn't supported via mongos", nss.db() == dbName); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::IllegalOperation, str::stream() << "can't do command: " << getName() << " on sharded collection", - !routingInfo.cm()); - - const auto primaryShard = routingInfo.db().primary(); + !cm.isSharded()); // Here, we first filter the command before appending an UNSHARDED shardVersion, because // "shardVersion" is one of the fields that gets filtered out. @@ -245,11 +236,14 @@ public: BSONObj filteredCmdObjWithVersion( appendShardVersion(filteredCmdObj, ChunkVersion::UNSHARDED())); - auto commandResponse = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( + auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())); + auto commandResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting::get(opCtx), dbName, - primaryShard->isConfig() ? filteredCmdObj : filteredCmdObjWithVersion, + cm.dbPrimary() == ShardRegistry::kConfigServerShardId ? filteredCmdObj + : filteredCmdObjWithVersion, Shard::RetryPolicy::kIdempotent)); uassert(ErrorCodes::IllegalOperation, @@ -260,7 +254,7 @@ public: if (!commandResponse.writeConcernStatus.isOK()) { appendWriteConcernErrorToCmdResponse( - primaryShard->getId(), commandResponse.response["writeConcernError"], result); + cm.dbPrimary(), commandResponse.response["writeConcernError"], result); } result.appendElementsUnique( CommandHelpers::filterCommandReplyForPassthrough(std::move(commandResponse.response))); diff --git a/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp index 3f115594172..0e1768c7ed4 100644 --- a/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp +++ b/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp @@ -54,7 +54,7 @@ public: void typedRun(OperationContext* opCtx) { const NamespaceString& nss = ns(); - const auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); @@ -64,7 +64,7 @@ public: } ConfigsvrRefineCollectionShardKey configsvrRefineCollShardKey( - nss, request().getKey(), routingInfo.cm()->getVersion().epoch()); + nss, request().getKey(), cm.getVersion().epoch()); configsvrRefineCollShardKey.setDbName(request().getDbName()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp index f5ea1d18fee..19d33b3f10b 100644 --- a/src/mongo/s/commands/cluster_split_cmd.cpp +++ b/src/mongo/s/commands/cluster_split_cmd.cpp @@ -130,10 +130,9 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); const BSONField<BSONObj> findField("find", BSONObj()); const BSONField<BSONArray> boundsField("bounds", BSONArray()); @@ -193,29 +192,29 @@ public: if (!find.isEmpty()) { // find - BSONObj shardKey = uassertStatusOK( - cm->getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); + BSONObj shardKey = + uassertStatusOK(cm.getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); if (shardKey.isEmpty()) { errmsg = str::stream() << "no shard key found in chunk query " << find; return false; } - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(shardKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(shardKey)); } else if (!bounds.isEmpty()) { // bounds - if (!cm->getShardKeyPattern().isShardKey(bounds[0].Obj()) || - !cm->getShardKeyPattern().isShardKey(bounds[1].Obj())) { + if (!cm.getShardKeyPattern().isShardKey(bounds[0].Obj()) || + !cm.getShardKeyPattern().isShardKey(bounds[1].Obj())) { errmsg = str::stream() << "shard key bounds " << "[" << bounds[0].Obj() << "," << bounds[1].Obj() << ")" - << " are not valid for shard key pattern " << cm->getShardKeyPattern().toBSON(); + << " are not valid for shard key pattern " << cm.getShardKeyPattern().toBSON(); return false; } - BSONObj minKey = cm->getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); - BSONObj maxKey = cm->getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); + BSONObj minKey = cm.getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); + BSONObj maxKey = cm.getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(minKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(minKey)); if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) { errmsg = str::stream() << "no chunk found with the shard key bounds " @@ -224,15 +223,15 @@ public: } } else { // middle - if (!cm->getShardKeyPattern().isShardKey(middle)) { + if (!cm.getShardKeyPattern().isShardKey(middle)) { errmsg = str::stream() << "new split key " << middle << " is not valid for shard key pattern " - << cm->getShardKeyPattern().toBSON(); + << cm.getShardKeyPattern().toBSON(); return false; } - middle = cm->getShardKeyPattern().normalizeShardKey(middle); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(middle)); + middle = cm.getShardKeyPattern().normalizeShardKey(middle); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(middle)); if (chunk->getMin().woCompare(middle) == 0 || chunk->getMax().woCompare(middle) == 0) { errmsg = str::stream() @@ -251,7 +250,7 @@ public: : selectMedianKey(opCtx, chunk->getShardId(), nss, - cm->getShardKeyPattern(), + cm.getShardKeyPattern(), ChunkRange(chunk->getMin(), chunk->getMax())); LOGV2(22758, @@ -266,15 +265,13 @@ public: shardutil::splitChunkAtMultiplePoints(opCtx, chunk->getShardId(), nss, - cm->getShardKeyPattern(), - cm->getVersion(), + cm.getShardKeyPattern(), + cm.getVersion(), ChunkRange(chunk->getMin(), chunk->getMax()), {splitPoint})); - // This invalidation is only necessary so that auto-split can begin to track statistics for - // the chunks produced after the split instead of the single original chunk. - Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(routingInfo), - chunk->getShardId()); + Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, + chunk->getShardId()); return true; } diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 9254504dfd9..72e38cd19fc 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -39,7 +39,6 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/query/cluster_cursor_manager.h" diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 56103395066..d85252e309c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -142,9 +142,9 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx, void updateHostsTargetedMetrics(OperationContext* opCtx, const NamespaceString& executionNss, - boost::optional<CachedCollectionRoutingInfo> executionNsRoutingInfo, + const boost::optional<ChunkManager>& cm, stdx::unordered_set<NamespaceString> involvedNamespaces) { - if (!executionNsRoutingInfo) + if (!cm) return; // Create a set of ShardIds that own a chunk belonging to any of the collections involved in @@ -153,9 +153,9 @@ void updateHostsTargetedMetrics(OperationContext* opCtx, std::set<ShardId> shardsOwningChunks = [&]() { std::set<ShardId> shardsIds; - if (executionNsRoutingInfo->cm()) { + if (cm->isSharded()) { std::set<ShardId> shardIdsForNs; - executionNsRoutingInfo->cm()->getAllShardIds(&shardIdsForNs); + cm->getAllShardIds(&shardIdsForNs); for (const auto& shardId : shardIdsForNs) { shardsIds.insert(shardId); } @@ -165,11 +165,11 @@ void updateHostsTargetedMetrics(OperationContext* opCtx, if (nss == executionNss) continue; - const auto resolvedNsRoutingInfo = + const auto resolvedNsCM = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - if (resolvedNsRoutingInfo.cm()) { + if (resolvedNsCM.isSharded()) { std::set<ShardId> shardIdsForNs; - resolvedNsRoutingInfo.cm()->getAllShardIds(&shardIdsForNs); + resolvedNsCM.getAllShardIds(&shardIdsForNs); for (const auto& shardId : shardIdsForNs) { shardsIds.insert(shardId); } @@ -219,9 +219,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, !request.getCollectionUUID()); const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) { - const auto resolvedNsRoutingInfo = - uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - return bool(resolvedNsRoutingInfo.cm()); + const auto resolvedNsCM = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + return resolvedNsCM.isSharded(); }; liteParsedPipeline.verifyIsSupported( @@ -235,11 +234,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // $changeStream, we allow the operation to continue so that stream cursors can be established // on the given namespace before the database or collection is actually created. If the database // does not exist and this is not a $changeStream, then we return an empty cursor. - boost::optional<CachedCollectionRoutingInfo> routingInfo; + boost::optional<ChunkManager> cm; auto executionNsRoutingInfoStatus = sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); if (executionNsRoutingInfoStatus.isOK()) { - routingInfo = std::move(executionNsRoutingInfoStatus.getValue()); + cm = std::move(executionNsRoutingInfoStatus.getValue()); } else if (!(hasChangeStream && executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { appendEmptyResultSetWithStatus( @@ -261,7 +260,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } return cluster_aggregation_planner::getCollationAndUUID( - routingInfo, namespaces.executionNss, request.getCollation()); + opCtx, cm, namespaces.executionNss, request.getCollation()); }(); // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, @@ -280,7 +279,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, opCtx, namespaces.executionNss, pipelineBuilder, - routingInfo, + cm, involvedNamespaces, hasChangeStream, liteParsedPipeline.allowedToPassthroughFromMongos()); @@ -306,7 +305,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return cluster_aggregation_planner::runPipelineOnPrimaryShard( expCtx, namespaces, - targeter.routingInfo->db(), + *targeter.cm, request.getExplain(), request.serializeToCommandObj(), privileges, @@ -352,7 +351,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, }(); if (status.isOK()) { - updateHostsTargetedMetrics(opCtx, namespaces.executionNss, routingInfo, involvedNamespaces); + updateHostsTargetedMetrics(opCtx, namespaces.executionNss, cm, involvedNamespaces); // Report usage statistics for each stage in the pipeline. liteParsedPipeline.tickGlobalStageCounters(); } diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 4fb474d2f8f..2c74736235a 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -29,18 +29,12 @@ #pragma once -#include <memory> -#include <vector> - #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" -#include "mongo/s/async_requests_sender.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/commands/strategy.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/document_source_merge_cursors.h" diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 3f589ba2df5..25b7a4e71f2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -162,7 +162,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const ClusterAggregate::Namespaces& namespaces, Document serializedCommand, long long batchSize, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const boost::optional<ChunkManager>& cm, DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result, const PrivilegeVector& privileges, @@ -200,12 +200,10 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we // therefore must have a valid routing table. - invariant(routingInfo); + invariant(cm); - const ShardId mergingShardId = pickMergingShard(opCtx, - shardDispatchResults.needsPrimaryShardMerge, - targetedShards, - routingInfo->db().primaryId()); + const ShardId mergingShardId = pickMergingShard( + opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cm->dbPrimary()); const bool mergingShardContributesData = std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != targetedShards.end(); @@ -481,8 +479,11 @@ ClusterClientCursorGuard convertPipelineToRouterStages( /** * Returns the output of the listCollections command filtered to the namespace 'nss'. */ -BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) { - ScopedDbConnection conn(primaryShard->getConnString()); +BSONObj getUnshardedCollInfo(OperationContext* opCtx, + const ShardId& shardId, + const NamespaceString& nss) { + auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + ScopedDbConnection conn(shard->getConnString()); std::list<BSONObj> all = conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); if (all.empty()) { @@ -492,7 +493,6 @@ BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& n return all.front(); } - /** * Returns the collection default collation or the simple collator if there is no default. If the * collection does not exist, then returns an empty BSON Object. @@ -551,7 +551,7 @@ AggregationTargeter AggregationTargeter::make( OperationContext* opCtx, const NamespaceString& executionNss, const std::function<std::unique_ptr<Pipeline, PipelineDeleter>()> buildPipelineFn, - boost::optional<CachedCollectionRoutingInfo> routingInfo, + boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, bool allowedToPassthrough) { @@ -559,9 +559,9 @@ AggregationTargeter AggregationTargeter::make( // Check if any of the involved collections are sharded. bool involvesShardedCollections = [&]() { for (const auto& nss : involvedNamespaces) { - const auto resolvedNsRoutingInfo = + const auto resolvedNsCM = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - if (resolvedNsRoutingInfo.cm()) { + if (resolvedNsCM.isSharded()) { return true; } } @@ -573,7 +573,7 @@ AggregationTargeter AggregationTargeter::make( sharded_agg_helpers::mustRunOnAllShards(executionNss, hasChangeStream); // If we don't have a routing table, then this is a $changeStream which must run on all shards. - invariant(routingInfo || (mustRunOnAll && hasChangeStream)); + invariant(cm || (mustRunOnAll && hasChangeStream)); // A pipeline is allowed to passthrough to the primary shard iff the following conditions are // met: @@ -585,20 +585,20 @@ AggregationTargeter AggregationTargeter::make( // $currentOp. // 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions // needs to include information about users that can only be deduced on mongos. - if (routingInfo && !routingInfo->cm() && !mustRunOnAll && allowedToPassthrough && + if (cm && !cm->isSharded() && !mustRunOnAll && allowedToPassthrough && !involvesShardedCollections) { - return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, routingInfo}; + return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cm}; } else { auto pipeline = buildPipelineFn(); auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired : TargetingPolicy::kAnyShard; - return AggregationTargeter{policy, std::move(pipeline), routingInfo}; + return AggregationTargeter{policy, std::move(pipeline), cm}; } } Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, - const CachedDatabaseInfo& dbInfo, + const ChunkManager& cm, boost::optional<ExplainOptions::Verbosity> explain, Document serializedCommand, const PrivilegeVector& privileges, @@ -615,7 +615,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& sharded_agg_helpers::createPassthroughCommandForShard( expCtx, serializedCommand, explain, boost::none, nullptr, BSONObj()))); - const auto shardId = dbInfo.primary()->getId(); + const auto shardId = cm.dbPrimary(); const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId) ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) : std::move(cmdObj); @@ -624,7 +624,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), namespaces.executionNss.db().toString(), - {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}}, + {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}}, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); auto response = ars.next(); @@ -746,7 +746,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, namespaces, serializedCommand, batchSize, - targeter.routingInfo, + targeter.cm, std::move(shardDispatchResults), result, privileges, @@ -754,11 +754,12 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, } std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + OperationContext* opCtx, + const boost::optional<ChunkManager>& cm, const NamespaceString& nss, const BSONObj& collation) { - const bool collectionIsSharded = (routingInfo && routingInfo->cm()); - const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); + const bool collectionIsSharded = (cm && cm->isSharded()); + const bool collectionIsNotSharded = (cm && !cm->isSharded()); // If this is a collectionless aggregation, we immediately return the user- // defined collation if one exists, or an empty BSONObj otherwise. Collectionless aggregations @@ -770,14 +771,13 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( } // If the collection is unsharded, obtain collInfo from the primary shard. - const auto unshardedCollInfo = collectionIsNotSharded - ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss) - : BSONObj(); + const auto unshardedCollInfo = + collectionIsNotSharded ? getUnshardedCollInfo(opCtx, cm->dbPrimary(), nss) : BSONObj(); // Return the collection UUID if available, or boost::none otherwise. const auto getUUID = [&]() -> auto { if (collectionIsSharded) { - return routingInfo->cm()->getUUID(); + return cm->getUUID(); } else { return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"] ? boost::optional<UUID>{uassertStatusOK( @@ -796,9 +796,8 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( if (collectionIsNotSharded) { return getDefaultCollationForUnshardedCollection(unshardedCollInfo); } else { - return routingInfo->cm()->getDefaultCollator() - ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON() - : CollationSpec::kSimpleSpec; + return cm->getDefaultCollator() ? cm->getDefaultCollator()->getSpec().toBSON() + : CollationSpec::kSimpleSpec; } }; diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index e5b9d968223..5beaed7b8f2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -62,7 +62,8 @@ ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, * collectionless namespace. */ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + OperationContext* opCtx, + const boost::optional<ChunkManager>& cm, const NamespaceString& nss, const BSONObj& collation); @@ -78,7 +79,7 @@ struct AggregationTargeter { OperationContext* opCtx, const NamespaceString& executionNss, const std::function<std::unique_ptr<Pipeline, PipelineDeleter>()> buildPipelineFn, - boost::optional<CachedCollectionRoutingInfo> routingInfo, + boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, bool allowedToPassthrough); @@ -90,12 +91,12 @@ struct AggregationTargeter { } policy; std::unique_ptr<Pipeline, PipelineDeleter> pipeline; - boost::optional<CachedCollectionRoutingInfo> routingInfo; + boost::optional<ChunkManager> cm; }; Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, - const CachedDatabaseInfo& dbInfo, + const ChunkManager& cm, boost::optional<ExplainOptions::Verbosity> explain, Document serializedCommand, const PrivilegeVector& privileges, diff --git a/src/mongo/s/query/cluster_exchange_test.cpp b/src/mongo/s/query/cluster_exchange_test.cpp index 8324a56e7e0..bb5ef977d46 100644 --- a/src/mongo/s/query/cluster_exchange_test.cpp +++ b/src/mongo/s/query/cluster_exchange_test.cpp @@ -44,8 +44,8 @@ #include "mongo/util/scopeguard.h" namespace mongo { - namespace { + using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; @@ -608,5 +608,6 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) { future.default_timed_get(); } + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8dabfd0bf63..3996e01c326 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -172,7 +172,7 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards( */ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( OperationContext* opCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardIds, const CanonicalQuery& query, bool appendGeoNearDistanceProjection) { @@ -202,12 +202,11 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( BSONObjBuilder cmdBuilder; qrToForward->asFindCommand(&cmdBuilder); - if (routingInfo.cm()) { - routingInfo.cm()->getVersion(shardId).appendToCommand(&cmdBuilder); + if (cm.isSharded()) { + cm.getVersion(shardId).appendToCommand(&cmdBuilder); } else if (!query.nss().isOnInternalDb()) { ChunkVersion::UNSHARDED().appendToCommand(&cmdBuilder); - auto dbVersion = routingInfo.db().databaseVersion(); - cmdBuilder.append("databaseVersion", dbVersion.toBSON()); + cmdBuilder.append("databaseVersion", cm.dbVersion().toBSON()); } if (opCtx->getTxnNumber()) { @@ -221,11 +220,11 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( } void updateNumHostsTargetedMetrics(OperationContext* opCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, int nTargetedShards) { int nShardsOwningChunks = 0; - if (routingInfo.cm()) { - nShardsOwningChunks = routingInfo.cm()->getNShardsOwningChunks(); + if (cm.isSharded()) { + nShardsOwningChunks = cm.getNShardsOwningChunks(); } auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( @@ -237,12 +236,12 @@ void updateNumHostsTargetedMetrics(OperationContext* opCtx, CursorId runQueryWithoutRetrying(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, std::vector<BSONObj>* results, bool* partialResultsReturned) { // Get the set of shards on which we will run the query. auto shardIds = getTargetedShardsForQuery(query.getExpCtx(), - routingInfo, + cm, query.getQueryRequest().getFilter(), query.getQueryRequest().getCollation()); @@ -306,8 +305,8 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Construct the requests that we will use to establish cursors on the targeted shards, // attaching the shardVersion and txnNumber, if necessary. - auto requests = constructRequestsForShards( - opCtx, routingInfo, shardIds, query, appendGeoNearDistanceProjection); + auto requests = + constructRequestsForShards(opCtx, cm, shardIds, query, appendGeoNearDistanceProjection); // Establish the cursors with a consistent shardVersion across shards. params.remotes = establishCursors(opCtx, @@ -398,7 +397,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorExhausted = true; if (shardIds.size() > 0) { - updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + updateNumHostsTargetedMetrics(opCtx, cm, shardIds.size()); } return CursorId(0); } @@ -419,7 +418,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorid = cursorId; if (shardIds.size() > 0) { - updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + updateNumHostsTargetedMetrics(opCtx, cm, shardIds.size()); } return cursorId; diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index 729a42edd19..1fedd6aea16 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -82,4 +82,5 @@ void killRemoteCursor(OperationContext* opCtx, executor::TaskExecutor* executor, RemoteCursor&& cursor, const NamespaceString& nss); + } // namespace mongo diff --git a/src/mongo/s/query/sharded_agg_test_fixture.h b/src/mongo/s/query/sharded_agg_test_fixture.h index f72d6c2f6ec..0bfedd9f5db 100644 --- a/src/mongo/s/query/sharded_agg_test_fixture.h +++ b/src/mongo/s/query/sharded_agg_test_fixture.h @@ -109,8 +109,8 @@ public: return response; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); + const auto cm = future.default_timed_get(); + ASSERT(cm->isSharded()); } protected: diff --git a/src/mongo/s/sessions_collection_sharded.cpp b/src/mongo/s/sessions_collection_sharded.cpp index d41f71bd617..060c1158dbd 100644 --- a/src/mongo/s/sessions_collection_sharded.cpp +++ b/src/mongo/s/sessions_collection_sharded.cpp @@ -62,19 +62,17 @@ BSONObj lsidQuery(const LogicalSessionId& lsid) { std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { - auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, NamespaceString::kLogicalSessionsNamespace)); - auto cm = routingInfo.cm(); - uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace << " is not sharded", - cm); + cm.isSharded()); std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard; for (const auto& session : sessions) { sessionIdsByOwningShard.emplace( - cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + cm.findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), session); } @@ -89,19 +87,17 @@ std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwnin std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard( OperationContext* opCtx, const LogicalSessionRecordSet& sessions) { - auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, NamespaceString::kLogicalSessionsNamespace)); - auto cm = routingInfo.cm(); - uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace << " is not sharded", - cm); + cm.isSharded()); std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard; for (const auto& session : sessions) { sessionsByOwningShard.emplace( - cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + cm.findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), session); } @@ -124,12 +120,11 @@ void SessionsCollectionSharded::checkSessionsCollectionExists(OperationContext* Grid::get(opCtx)->isShardingInitialized()); // If the collection doesn't exist, fail. Only the config servers generate it. - const auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( opCtx, NamespaceString::kLogicalSessionsNamespace)); - uassert( - ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", routingInfo.cm()); + uassert(ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", cm.isSharded()); } void SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index 33ceee5e0d5..05db6bd6ce9 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -82,7 +82,6 @@ Status initializeGlobalShardingState(OperationContext* opCtx, rpc::ShardingEgressMetadataHookBuilder hookBuilder, boost::optional<size_t> taskExecutorPoolSize); - /** * Loads cluster ID and waits for the reload of the Shard Registry. */ diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index b9bd33d9782..4a492481be4 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -39,7 +39,6 @@ #include "mongo/db/ops/write_ops_parsers.h" #include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/cluster_commands_helpers.h" -#include "mongo/s/grid.h" #include "mongo/s/transaction_router.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index ec79a4ec1e4..f7189efdfe9 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -171,7 +171,7 @@ BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext> * { _id : { $lt : 30 } } => false * { foo : <anything> } => false */ -bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager* cm) { +bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager& cm) { auto shardKey = kVirtualIdShardKey.extractShardKeyFromQuery(query); BSONElement idElt = shardKey["_id"]; @@ -179,9 +179,9 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const return false; } - if (CollationIndexKey::isCollatableType(idElt.type()) && cm && + if (CollationIndexKey::isCollatableType(idElt.type()) && cm.isSharded() && !query.getQueryRequest().getCollation().isEmpty() && - !CollatorInterface::collatorsMatch(query.getCollator(), cm->getDefaultCollator())) { + !CollatorInterface::collatorsMatch(query.getCollator(), cm.getDefaultCollator())) { // The collation applies to the _id field, but the user specified a collation which doesn't // match the collection default. @@ -195,7 +195,7 @@ bool isExactIdQuery(OperationContext* opCtx, const NamespaceString& nss, const BSONObj query, const BSONObj collation, - const ChunkManager* cm) { + const ChunkManager& cm) { auto qr = std::make_unique<QueryRequest>(nss); qr->setFilter(query); if (!collation.isEmpty()) { @@ -242,10 +242,9 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA, return CompareResult_GTE; } -ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, - const ShardId& shardId) { - if (routingInfo.cm()) { - return routingInfo.cm()->getVersion(shardId); +ChunkVersion getShardVersion(const ChunkManager& cm, const ShardId& shardId) { + if (cm.isSharded()) { + return cm.getVersion(shardId); } return ChunkVersion::UNSHARDED(); @@ -262,7 +261,7 @@ ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, * Note that the signature here is weird since our cached map of chunk versions is stored in a * ChunkManager or is implicit in the primary shard of the collection. */ -CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, +CompareResult compareAllShardVersions(const ChunkManager& cm, const StaleShardVersionMap& remoteShardVersions) { CompareResult finalResult = CompareResult_GTE; @@ -274,7 +273,7 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing try { // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion(routingInfo, shardId); + cachedShardVersion = getShardVersion(cm, shardId); } catch (const DBException& ex) { LOGV2_WARNING(22915, "could not lookup shard {shardId} in local cache, shard metadata may " @@ -301,9 +300,8 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing return finalResult; } -CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, - const DatabaseVersion& remoteDbVersion) { - DatabaseVersion cachedDbVersion = routingInfo.db().databaseVersion(); +CompareResult compareDbVersions(const ChunkManager& cm, const DatabaseVersion& remoteDbVersion) { + DatabaseVersion cachedDbVersion = cm.dbVersion(); // Db may have been dropped if (cachedDbVersion.getUuid() != remoteDbVersion.getUuid()) { @@ -321,15 +319,16 @@ CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ -bool isMetadataDifferent(const ChunkManager* managerA, +bool isMetadataDifferent(const ChunkManager& managerA, const DatabaseVersion dbVersionA, - const ChunkManager* managerB, + const ChunkManager& managerB, const DatabaseVersion dbVersionB) { - if ((managerA && !managerB) || (!managerA && managerB)) + if ((managerA.isSharded() && !managerB.isSharded()) || + (!managerA.isSharded() && managerB.isSharded())) return true; - if (managerA) { - return managerA->getVersion() != managerB->getVersion(); + if (managerA.isSharded()) { + return managerA.getVersion() != managerB.getVersion(); } return !databaseVersion::equal(dbVersionA, dbVersionB); @@ -346,13 +345,13 @@ ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, void ChunkManagerTargeter::_init(OperationContext* opCtx) { createShardDatabase(opCtx, _nss.db()); - _routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); + _cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); if (_targetEpoch) { - uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm()); + uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _cm->isSharded()); uassert(ErrorCodes::StaleEpoch, "Collection epoch has changed", - _routingInfo->cm()->getVersion().epoch() == *_targetEpoch); + _cm->getVersion().epoch() == *_targetEpoch); } } @@ -364,8 +363,8 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, const BSONObj& doc) const { BSONObj shardKey; - if (_routingInfo->cm()) { - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); + if (_cm->isSharded()) { + shardKey = _cm->getShardKeyPattern().extractShardKeyFromDoc(doc); // The shard key would only be empty after extraction if we encountered an error case, such // as the shard key possessing an array value or array descendants. If the shard key // presented to the targeter was empty, we would emplace the missing fields, and the @@ -381,9 +380,7 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize())); } - return ShardEndpoint(_routingInfo->db().primary()->getId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()); + return ShardEndpoint(_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()); } std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx, @@ -405,13 +402,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. - if (!_routingInfo->cm()) { - return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()}}; + if (!_cm->isSharded()) { + return std::vector<ShardEndpoint>{ + {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}}; } - const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern(); + const auto& shardKeyPattern = _cm->getShardKeyPattern(); const auto collation = write_ops::collationOf(updateOp); auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx, @@ -468,7 +464,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* "a single shard (and have the simple collation), but this update targeted " << endPoints.size() << " shards. Update request: " << updateOp.toBSON() << ", shard key pattern: " << shardKeyPattern.toString(), - updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, _routingInfo->cm())); + updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, *_cm)); // If the request is {multi:false}, then this is a single op-style update which we are // broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics. @@ -492,13 +488,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* itemRef.getRuntimeConstants()); BSONObj shardKey; - if (_routingInfo->cm()) { + if (_cm->isSharded()) { // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - shardKey = - uassertStatusOK(_routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery( - expCtx, deleteOp.getQ())); + shardKey = uassertStatusOK( + _cm->getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ())); } // Target the shard key or delete query @@ -530,10 +525,9 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* str::stream() << "A single delete on a sharded collection must contain an exact match " "on _id (and have the collection default collation) or contain the " "shard key (and have the simple collation). Delete request: " - << deleteOp.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString(), - !_routingInfo->cm() || deleteOp.getMulti() || - isExactIdQuery(opCtx, *cq, _routingInfo->cm())); + << deleteOp.toBSON() + << ", shard key pattern: " << _cm->getShardKeyPattern().toString(), + !_cm->isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, *_cm)); return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation)); } @@ -542,22 +536,21 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( boost::intrusive_ptr<ExpressionContext> expCtx, const BSONObj& query, const BSONObj& collation) const { - if (!_routingInfo->cm()) { - return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()}}; + if (!_cm->isSharded()) { + return std::vector<ShardEndpoint>{ + {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}}; } std::set<ShardId> shardIds; try { - _routingInfo->cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + _cm->getShardIdsForQuery(expCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); + endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId)); } return endpoints; @@ -567,8 +560,8 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s const BSONObj& collation, long long estDataSize) const { try { - auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); - return {{chunk.getShardId(), _routingInfo->cm()->getVersion(chunk.getShardId())}}; + auto chunk = _cm->findIntersectingChunk(shardKey, collation); + return {{chunk.getShardId(), _cm->getVersion(chunk.getShardId())}}; } catch (const DBException& ex) { return ex.toStatus(); } @@ -578,14 +571,14 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const { // This function is only called if doing a multi write that targets more than one shard. This // implies the collection is sharded, so we should always have a chunk manager. - invariant(_routingInfo->cm()); + invariant(_cm->isSharded()); std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); + endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId)); } return endpoints; @@ -605,7 +598,7 @@ void ChunkManagerTargeter::noteStaleShardResponse(const ShardEndpoint& endpoint, ChunkVersion remoteShardVersion; if (!staleInfo.getVersionWanted()) { // If we don't have a vWanted sent, assume the version is higher than our current version. - remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); + remoteShardVersion = getShardVersion(*_cm, endpoint.shardName); remoteShardVersion.incMajor(); } else { remoteShardVersion = *staleInfo.getVersionWanted(); @@ -637,7 +630,7 @@ void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint, DatabaseVersion remoteDbVersion; if (!staleInfo.getVersionWanted()) { // If the vWanted is not set, assume the wanted version is higher than our current version. - remoteDbVersion = _routingInfo->db().databaseVersion(); + remoteDbVersion = _cm->dbVersion(); remoteDbVersion = databaseVersion::makeIncremented(remoteDbVersion); } else { remoteDbVersion = *staleInfo.getVersionWanted(); @@ -690,8 +683,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha // Get the latest metadata information from the cache if there were issues // - auto lastManager = _routingInfo->cm(); - auto lastDbVersion = _routingInfo->db().databaseVersion(); + auto lastManager = *_cm; + auto lastDbVersion = _cm->dbVersion(); _init(opCtx); @@ -710,8 +703,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha // If we couldn't target, we might need to refresh if we haven't remotely refreshed // the metadata since we last got it from the cache. - bool alreadyRefreshed = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + bool alreadyRefreshed = + isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { @@ -720,13 +713,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); + CompareResult result = compareAllShardVersions(*_cm, _remoteShardVersions); LOGV2_DEBUG(22913, 4, @@ -743,13 +735,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } else if (_remoteDbVersion) { // If we got stale database versions from the remote shard, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareDbVersions(*_routingInfo, *_remoteDbVersion); + CompareResult result = compareDbVersions(*_cm, *_remoteDbVersion); LOGV2_DEBUG(22914, 4, @@ -767,18 +758,17 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } } bool ChunkManagerTargeter::endpointIsConfigServer() const { - if (!_routingInfo->cm()) { - return _routingInfo->db().primaryId() == ShardRegistry::kConfigServerShardId; + if (!_cm->isSharded()) { + return _cm->dbPrimary() == ShardRegistry::kConfigServerShardId; } std::set<ShardId> shardIds; - _routingInfo->cm()->getAllShardIds(&shardIds); + _cm->getAllShardIds(&shardIds); if (std::any_of(shardIds.begin(), shardIds.end(), [](const auto& shardId) { return shardId == ShardRegistry::kConfigServerShardId; @@ -792,8 +782,8 @@ bool ChunkManagerTargeter::endpointIsConfigServer() const { } int ChunkManagerTargeter::getNShardsOwningChunks() const { - if (_routingInfo->cm()) { - return _routingInfo->cm()->getNShardsOwningChunks(); + if (_cm->isSharded()) { + return _cm->getNShardsOwningChunks(); } return 0; diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 1e62bc2eeb5..caaa8884399 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -35,7 +35,7 @@ #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -151,7 +151,7 @@ private: bool _needsTargetingRefresh; // The latest loaded routing cache entry - boost::optional<CachedCollectionRoutingInfo> _routingInfo; + boost::optional<ChunkManager> _cm; // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and // find a new epoch, we immediately throw a StaleEpoch exception. diff --git a/src/mongo/util/lru_cache.h b/src/mongo/util/lru_cache.h index adb35affa7c..6cd4e8b691b 100644 --- a/src/mongo/util/lru_cache.h +++ b/src/mongo/util/lru_cache.h @@ -58,12 +58,12 @@ class LRUCache { LRUCache(const LRUCache&) = delete; LRUCache& operator=(const LRUCache&) = delete; -public: - explicit LRUCache(std::size_t maxSize) : _maxSize(maxSize) {} - LRUCache(LRUCache&&) = delete; LRUCache& operator=(LRUCache&&) = delete; +public: + explicit LRUCache(std::size_t maxSize) : _maxSize(maxSize) {} + using ListEntry = std::pair<K, V>; using List = std::list<ListEntry>; @@ -88,27 +88,27 @@ public: */ boost::optional<std::pair<K, V>> add(const K& key, V entry) { // If the key already exists, delete it first. - auto i = this->_map.find(key); - if (i != this->_map.end()) { - this->_list.erase(i->second); + auto i = _map.find(key); + if (i != _map.end()) { + _list.erase(i->second); } - this->_list.push_front(std::make_pair(key, std::move(entry))); - this->_map[key] = this->_list.begin(); + _list.push_front(std::make_pair(key, std::move(entry))); + _map[key] = _list.begin(); // If the store has grown beyond its allowed size, // evict the least recently used entry. - if (this->size() > this->_maxSize) { - auto pair = std::move(this->_list.back()); + if (size() > _maxSize) { + auto pair = std::move(_list.back()); - this->_map.erase(pair.first); - this->_list.pop_back(); + _map.erase(pair.first); + _list.pop_back(); - invariant(this->size() <= this->_maxSize); + invariant(size() <= _maxSize); return std::move(pair); } - invariant(this->size() <= this->_maxSize); + invariant(size() <= _maxSize); return boost::none; } @@ -116,7 +116,7 @@ public: * Finds an element in the cache by key. */ iterator find(const K& key) { - return this->promote(key); + return promote(key); } /** @@ -129,11 +129,11 @@ public: * properly. */ const_iterator cfind(const K& key) const { - auto it = this->_map.find(key); + auto it = _map.find(key); // TODO(SERVER-28890): Remove the function-style cast when MSVC's // `std::list< ... >::iterator` implementation doesn't conflict with their `/Zc:ternary` // flag support . - return (it == this->_map.end()) ? this->end() : const_iterator(it->second); + return (it == _map.end()) ? end() : const_iterator(it->second); } /** @@ -141,8 +141,8 @@ public: * to the least recently used element. */ iterator promote(const K& key) { - auto it = this->_map.find(key); - return (it == this->_map.end()) ? this->end() : this->promote(it->second); + auto it = _map.find(key); + return (it == _map.end()) ? end() : promote(it->second); } /** @@ -150,12 +150,12 @@ public: * recently used element in the cache. */ iterator promote(const iterator& iter) { - if (iter == this->_list.end()) { + if (iter == _list.end()) { return iter; } - this->_list.splice(this->_list.begin(), this->_list, iter); - return this->_list.begin(); + _list.splice(_list.begin(), _list, iter); + return _list.begin(); } /** @@ -163,12 +163,12 @@ public: * least recently used element in the cache. */ const_iterator promote(const const_iterator& iter) { - if (iter == this->_list.cend()) { + if (iter == _list.cend()) { return iter; } - this->_list.splice(this->_list.begin(), this->_list, iter); - return this->_list.begin(); + _list.splice(_list.begin(), _list, iter); + return _list.begin(); } /** @@ -176,13 +176,13 @@ public: * exists. Returns the count of elements erased. */ typename Map::size_type erase(const K& key) { - auto it = this->_map.find(key); - if (it == this->_map.end()) { + auto it = _map.find(key); + if (it == _map.end()) { return 0; } - this->_list.erase(it->second); - this->_map.erase(it); + _list.erase(it->second); + _map.erase(it); return 1; } @@ -192,17 +192,17 @@ public: * element, or the end iterator, if no such element exists. */ iterator erase(iterator it) { - invariant(it != this->_list.end()); - invariant(this->_map.erase(it->first) == 1); - return this->_list.erase(it); + invariant(it != _list.end()); + invariant(_map.erase(it->first) == 1); + return _list.erase(it); } /** * Removes all items from the cache. */ void clear() { - this->_map.clear(); - this->_list.clear(); + _map.clear(); + _list.clear(); } /** @@ -210,64 +210,64 @@ public: * Otherwise, returns false. */ bool hasKey(const K& key) const { - return _map.find(key) != this->_map.end(); + return _map.find(key) != _map.end(); } /** * Returns the number of elements currently in the cache. */ std::size_t size() const { - return this->_list.size(); + return _list.size(); } bool empty() const { - return this->_list.empty(); + return _list.empty(); } /** * Returns an iterator pointing to the most recently used element in the cache. */ iterator begin() { - return this->_list.begin(); + return _list.begin(); } /** * Returns an iterator pointing past the least recently used element in the cache. */ iterator end() { - return this->_list.end(); + return _list.end(); } /** * Returns a const_iterator pointing to the most recently used element in the cache. */ const_iterator begin() const { - return this->_list.begin(); + return _list.begin(); } /** * Returns a const_iterafor pointing past the least recently used element in the cache. */ const_iterator end() const { - return this->_list.end(); + return _list.end(); } /** * Returns a const_iterator pointing to the most recently used element in the cache. */ const_iterator cbegin() const { - return this->_list.cbegin(); + return _list.cbegin(); } /** * Returns a const_iterator pointing past the least recently used element in the cache. */ const_iterator cend() const { - return this->_list.cend(); + return _list.cend(); } typename Map::size_type count(const K& key) const { - return this->_map.count(key); + return _map.count(key); } private: |