diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-08-25 01:42:28 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-01 10:50:45 +0000 |
commit | 9cb82d2e8680717d3002459ba5fdb16036183a17 (patch) | |
tree | 6d49e2f6a2bba23707285e90ec1e8b3beba41400 /src/mongo | |
parent | ca4df25002a60910b38bfdd8d71eb5bff5a79b49 (diff) | |
download | mongo-9cb82d2e8680717d3002459ba5fdb16036183a17.tar.gz |
SERVER-50505 Make the CatalogCache return ChunkManager(s) directly
... instead of returning the intermediate CachedCollectionRoutingInfo
class. The ChunkManager should be the only class used for routing.
Diffstat (limited to 'src/mongo')
72 files changed, 737 insertions, 946 deletions
diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp index 6c93d22d318..4e4ddb99af7 100644 --- a/src/mongo/db/commands/feature_compatibility_version.cpp +++ b/src/mongo/db/commands/feature_compatibility_version.cpp @@ -54,8 +54,6 @@ #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" #include "mongo/transport/service_entry_point.h" namespace mongo { diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 2f7a8e06cb5..48b3c04c5b7 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -58,7 +58,6 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/database_version_helpers.h" -#include "mongo/s/grid.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 1d4480cb4ba..7a859ccbbd6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -52,8 +52,6 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index cadf989486d..c42def9ca86 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -57,8 +57,6 @@ #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/update/update_oplog_entry_version.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" namespace mongo { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 88429000f86..c7b8bd273ec 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -33,13 +33,10 @@ #include "mongo/db/pipeline/pipeline_d.h" -#include <memory> - #include "mongo/base/exact_cast.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -75,19 +72,12 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" -#include "mongo/db/transaction_participant.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" -#include "mongo/s/grid.h" #include "mongo/s/query/document_source_merge_cursors.h" -#include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/time_support.h" namespace mongo { diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index 9faf9b11eaa..330ef41693e 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -47,7 +47,6 @@ #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" #include "mongo/util/net/socket_utils.h" @@ -139,11 +138,10 @@ std::vector<BSONObj> CommonProcessInterface::getCurrentOps( std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsRouter( OperationContext* opCtx, const NamespaceString& nss) const { - auto cri = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (auto chunkManager = cri.cm()) { - return _shardKeyToDocumentKeyFields( - chunkManager->getShardKeyPattern().getKeyPatternFields()); + if (cm.isSharded()) { + return _shardKeyToDocumentKeyFields(cm.getShardKeyPattern().getKeyPatternFields()); } // We have no evidence this collection is sharded, so the document key is just _id. @@ -187,12 +185,12 @@ bool CommonProcessInterface::keyPatternNamesExactPaths(const BSONObj& keyPattern boost::optional<ChunkVersion> CommonProcessInterface::refreshAndGetCollectionVersion( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const { const bool forceRefreshFromThisThread = false; - auto routingInfo = uassertStatusOK( + auto cm = uassertStatusOK( Grid::get(expCtx->opCtx) ->catalogCache() ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread)); - if (auto chunkManager = routingInfo.cm()) { - return chunkManager->getVersion(); + if (cm.isSharded()) { + return cm.getVersion(); } return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 42d05465291..92b87ccb5fc 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -56,25 +56,19 @@ #include "mongo/util/fail_point.h" namespace mongo { - -using boost::intrusive_ptr; -using std::shared_ptr; -using std::string; -using std::unique_ptr; - namespace { /** * Returns the routing information for the namespace set on the passed ExpressionContext. Also * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry. */ -StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( - const intrusive_ptr<ExpressionContext>& expCtx) { +StatusWith<ChunkManager> getCollectionRoutingInfo( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); // Additionally check that the ExpressionContext's UUID matches the collection routing info. - if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().cm()) { - if (!swRoutingInfo.getValue().cm()->uuidMatches(*expCtx->uuid)) { + if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().isSharded()) { + if (!swRoutingInfo.getValue().uuidMatches(*expCtx->uuid)) { return {ErrorCodes::NamespaceNotFound, str::stream() << "The UUID of collection " << expCtx->ns.ns() << " changed; it may have been dropped and re-created."}; @@ -158,10 +152,10 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( str::stream() << "Looking up document matching " << redact(filter.toBson()), [&]() -> std::vector<RemoteCursor> { // Verify that the collection exists, with the correct UUID. - auto routingInfo = uassertStatusOK(getCollectionRoutingInfo(foreignExpCtx)); + auto cm = uassertStatusOK(getCollectionRoutingInfo(foreignExpCtx)); // Finalize the 'find' command object based on the routing table information. - if (findCmdIsByUuid && routingInfo.cm()) { + if (findCmdIsByUuid && cm.isSharded()) { // Find by UUID and shard versioning do not work together (SERVER-31946). In // the sharded case we've already checked the UUID, so find by namespace is // safe. In the unlikely case that the collection has been deleted and a new @@ -176,12 +170,8 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( // single shard will be targeted here; however, in certain cases where only the _id // is present, we may need to scatter-gather the query to all shards in order to // find the document. - auto requests = getVersionedRequestsForTargetedShards(expCtx->opCtx, - nss, - routingInfo, - findCmd, - filterObj, - CollationSpec::kSimpleSpec); + auto requests = getVersionedRequestsForTargetedShards( + expCtx->opCtx, nss, cm, findCmd, filterObj, CollationSpec::kSimpleSpec); // Dispatch the requests. The 'establishCursors' method conveniently prepares the // result into a vector of cursor responses for us. @@ -279,7 +269,7 @@ void MongosProcessInterface::_reportCurrentOpsForTransactionCoordinators( OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {}; std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( - const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { + const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { invariant(hasGlobalServiceContext()); auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); invariant(cursorManager); @@ -287,9 +277,9 @@ std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( } bool MongosProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfo = + auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - return static_cast<bool>(routingInfo.cm()); + return cm.isSharded(); } bool MongosProcessInterface::fieldsHaveSupportingUniqueIndex( diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index f7356c6ded7..b93a255eb30 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -63,9 +63,9 @@ ShardServerProcessInterface::ShardServerProcessInterface( } bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - return static_cast<bool>(routingInfo.cm()); + return cm.isSharded(); } void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( @@ -85,17 +85,17 @@ ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(Operati invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); auto* const catalogCache = Grid::get(opCtx)->catalogCache(); - auto swCollectionRoutingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); - if (swCollectionRoutingInfo.isOK()) { - auto cm = swCollectionRoutingInfo.getValue().cm(); - if (cm && cm->uuidMatches(uuid)) { + auto swCM = catalogCache->getCollectionRoutingInfo(opCtx, nss); + if (swCM.isOK()) { + const auto& cm = swCM.getValue(); + if (cm.isSharded() && cm.uuidMatches(uuid)) { // Unpack the shard key. Collection is now sharded so the document key fields will never // change, mark as final. - return {_shardKeyToDocumentKeyFields(cm->getShardKeyPattern().getKeyPatternFields()), + return {_shardKeyToDocumentKeyFields(cm.getShardKeyPattern().getKeyPatternFields()), true}; } - } else if (swCollectionRoutingInfo != ErrorCodes::NamespaceNotFound) { - uassertStatusOK(std::move(swCollectionRoutingInfo)); + } else if (swCM != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(std::move(swCM)); } // An unsharded collection can still become sharded so is not final. If the uuid doesn't match diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e9b0e3f48ae..e4275b4a3d9 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -154,25 +154,23 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, return cmdForShards.freeze().toBson(); } -std::vector<RemoteCursor> establishShardCursors( - OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor, - const NamespaceString& nss, - bool hasChangeStream, - boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const std::set<ShardId>& shardIds, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref) { +std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + bool mustRunOnAll, + boost::optional<ChunkManager>& cm, + const std::set<ShardId>& shardIds, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref) { LOGV2_DEBUG(20904, 1, "Dispatching command {cmdObj} to establish cursors on shards", "cmdObj"_attr = redact(cmdObj)); - const bool mustRunOnAll = mustRunOnAllShards(nss, hasChangeStream); std::vector<std::pair<ShardId, BSONObj>> requests; // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo || mustRunOnAll); + invariant(cm || mustRunOnAll); if (mustRunOnAll) { // The pipeline contains a stage which must be run on all shards. Skip versioning and @@ -180,22 +178,21 @@ std::vector<RemoteCursor> establishShardCursors( for (const auto& shardId : shardIds) { requests.emplace_back(shardId, cmdObj); } - } else if (routingInfo->cm()) { + } else if (cm->isSharded()) { // The collection is sharded. Use the routing table to decide which shards to target // based on the query and collation, and build versioned requests for them. for (const auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + auto versionedCmdObj = appendShardVersion(cmdObj, cm->getVersion(shardId)); requests.emplace_back(shardId, std::move(versionedCmdObj)); } } else { // The collection is unsharded. Target only the primary shard for the database. // Don't append shard version info when contacting the config servers. - const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig() + const auto cmdObjWithShardVersion = cm->dbPrimary() != ShardRegistry::kConfigServerShardId ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) : cmdObj; - requests.emplace_back(routingInfo->db().primaryId(), - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo->db())); + requests.emplace_back(cm->dbPrimary(), + appendDbVersionIfPresent(cmdObjWithShardVersion, cm->dbVersion())); } if (MONGO_unlikely(shardedAggregateHangBeforeEstablishingShardCursors.shouldFail())) { @@ -218,7 +215,7 @@ std::vector<RemoteCursor> establishShardCursors( std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expCtx, bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const boost::optional<ChunkManager>& cm, const BSONObj shardQuery, const BSONObj collation) { if (mustRunOnAllShards) { @@ -228,10 +225,8 @@ std::set<ShardId> getTargetedShards(boost::intrusive_ptr<ExpressionContext> expC return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())}; } - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo); - - return getTargetedShardsForQuery(expCtx, *routingInfo, shardQuery, collation); + invariant(cm); + return getTargetedShardsForQuery(expCtx, *cm, shardQuery, collation); } /** @@ -657,9 +652,9 @@ boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationConte return boost::none; } - const auto routingInfo = + const auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, mergeStage->getOutputNs())); - if (!routingInfo.cm()) { + if (!cm.isSharded()) { return boost::none; } @@ -671,7 +666,7 @@ boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationConte // inserted on. With this ability we can insert an exchange on the shards to partition the // documents based on which shard will end up owning them. Then each shard can perform a merge // of only those documents which belong to it (optimistically, barring chunk migrations). - return walkPipelineBackwardsTrackingShardKey(opCtx, mergePipeline, *routingInfo.cm()); + return walkPipelineBackwardsTrackingShardKey(opCtx, mergePipeline, cm); } SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { @@ -794,7 +789,7 @@ DispatchShardPipelineResults dispatchShardPipeline( auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; + : boost::optional<ChunkManager>{}; // Determine whether we can run the entire aggregation on a single shard. const auto collationObj = expCtx->getCollatorBSON(); @@ -808,7 +803,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // - The pipeline contains one or more stages which must always merge on mongoS. const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || (needsPrimaryShardMerge && executionNsRoutingInfo && - *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + *(shardIds.begin()) != executionNsRoutingInfo->dbPrimary())); boost::optional<ShardedExchangePolicy> exchangeSpec; boost::optional<SplitPipeline> splitPipelines; @@ -898,7 +893,7 @@ DispatchShardPipelineResults dispatchShardPipeline( cursors = establishShardCursors(opCtx, expCtx->mongoProcessInterface->taskExecutor, expCtx->ns, - hasChangeStream, + mustRunOnAll, executionNsRoutingInfo, shardIds, targetedCommand, @@ -926,7 +921,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // must increment the number of involved shards. CurOp::get(opCtx)->debug().nShards = shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); + !shardIds.count(executionNsRoutingInfo->dbPrimary())); return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(ownedCursors), @@ -1099,8 +1094,8 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) { return BSON("pipeline" << explainBuilder.done()); } -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss) { +StatusWith<ChunkManager> getExecutionNsRoutingInfo(OperationContext* opCtx, + const NamespaceString& execNss) { // First, verify that there are shards present in the cluster. If not, then we return the // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on @@ -1109,12 +1104,11 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte // aggregations do when the database does not exist. std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - if (shardIds.size() == 0) { + if (shardIds.empty()) { return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; } - // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not - // exist. + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not exist return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index b537406cb6e..13a20fee607 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -35,8 +35,6 @@ #include "mongo/s/query/owned_remote_cursor.h" namespace mongo { -class CachedCollectionRoutingInfo; - namespace sharded_agg_helpers { /** @@ -172,8 +170,8 @@ Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, * Returns 'ShardNotFound' or 'NamespaceNotFound' if there are no shards in the cluster or if * collection 'execNss' does not exist, respectively. */ -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss); +StatusWith<ChunkManager> getExecutionNsRoutingInfo(OperationContext* opCtx, + const NamespaceString& execNss); /** * Returns true if an aggregation over 'nss' must run on all shards. diff --git a/src/mongo/db/s/README.md b/src/mongo/db/s/README.md index b4737b57764..bf23835067c 100644 --- a/src/mongo/db/s/README.md +++ b/src/mongo/db/s/README.md @@ -99,11 +99,9 @@ collection or database. A full refresh occurs when: * [The CatalogCache (routing table cache) class](https://github.com/mongodb/mongo/blob/master/src/mongo/s/catalog_cache.h) * [The CachedDatabaseInfo class](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L61-L81) -* [The CachedCollectionRoutingInfo class](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L83-L119) Methods that will mark routing table cache information as stale (sharded collection). -* [onStaleShardVersion](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L207-L213) * [invalidateShardOrEntireCollectionEntryForShardedCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L226-L236) * [invalidateShardForShardedCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L262-L268) * [invalidateEntriesThatReferenceShard](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L270-L274) diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index d91670c60d8..8cdcbc7c8ec 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -50,7 +50,6 @@ #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/balancer_collection_status_gen.h" @@ -612,13 +611,13 @@ Status Balancer::_splitChunksIfNeeded(OperationContext* opCtx) { return routingInfoStatus.getStatus(); } - auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); auto splitStatus = shardutil::splitChunkAtMultiplePoints(opCtx, splitInfo.shardId, splitInfo.nss, - cm->getShardKeyPattern(), + cm.getShardKeyPattern(), splitInfo.collectionVersion, ChunkRange(splitInfo.minKey, splitInfo.maxKey), splitInfo.splitKeys); @@ -701,17 +700,16 @@ int Balancer::_moveChunks(OperationContext* opCtx, void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); - auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey); + auto chunk = cm.findIntersectingChunkWithSimpleCollation(minKey); try { const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( opCtx, chunk.getShardId(), nss, - cm->getShardKeyPattern(), + cm.getShardKeyPattern(), ChunkRange(chunk.getMin(), chunk.getMax()), Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), boost::none)); @@ -747,8 +745,8 @@ void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, shardutil::splitChunkAtMultiplePoints(opCtx, chunk.getShardId(), nss, - cm->getShardKeyPattern(), - cm->getVersion(), + cm.getShardKeyPattern(), + cm.getVersion(), ChunkRange(chunk.getMin(), chunk.getMax()), splitPoints)); } catch (const DBException&) { diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index e41e0f977f1..d1aa8aae5a6 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -65,7 +65,10 @@ namespace { * distribution and chunk placement information which is needed by the balancer policy. */ StatusWith<DistributionStatus> createCollectionDistributionStatus( - OperationContext* opCtx, const ShardStatisticsVector& allShards, const ChunkManager& chunkMgr) { + OperationContext* opCtx, + const NamespaceString& nss, + const ShardStatisticsVector& allShards, + const ChunkManager& chunkMgr) { ShardToChunksMap shardToChunksMap; // Makes sure there is an entry in shardToChunksMap for every shard, so empty shards will also @@ -76,7 +79,7 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( chunkMgr.forEachChunk([&](const auto& chunkEntry) { ChunkType chunk; - chunk.setNS(chunkMgr.getns()); + chunk.setNS(nss); chunk.setMin(chunkEntry.getMin()); chunk.setMax(chunkEntry.getMax()); chunk.setJumbo(chunkEntry.isJumbo()); @@ -89,14 +92,14 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( }); const auto swCollectionTags = - Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, chunkMgr.getns()); + Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, nss); if (!swCollectionTags.isOK()) { return swCollectionTags.getStatus().withContext( - str::stream() << "Unable to load tags for collection " << chunkMgr.getns()); + str::stream() << "Unable to load tags for collection " << nss); } const auto& collectionTags = swCollectionTags.getValue(); - DistributionStatus distribution(chunkMgr.getns(), std::move(shardToChunksMap)); + DistributionStatus distribution(nss, std::move(shardToChunksMap)); // Cache the collection tags const auto& keyPattern = chunkMgr.getShardKeyPattern().getKeyPattern(); @@ -182,16 +185,16 @@ private: * Populates splitCandidates with chunk and splitPoint pairs for chunks that violate tag * range boundaries. */ -void getSplitCandidatesToEnforceTagRanges(const ChunkManager* cm, +void getSplitCandidatesToEnforceTagRanges(const ChunkManager& cm, const DistributionStatus& distribution, SplitCandidatesBuffer* splitCandidates) { - const auto& globalMax = cm->getShardKeyPattern().getKeyPattern().globalMax(); + const auto& globalMax = cm.getShardKeyPattern().getKeyPattern().globalMax(); // For each tag range, find chunks that need to be split. for (const auto& tagRangeEntry : distribution.tagRanges()) { const auto& tagRange = tagRangeEntry.second; - const auto chunkAtZoneMin = cm->findIntersectingChunkWithSimpleCollation(tagRange.min); + const auto chunkAtZoneMin = cm.findIntersectingChunkWithSimpleCollation(tagRange.min); invariant(chunkAtZoneMin.getMax().woCompare(tagRange.min) > 0); if (chunkAtZoneMin.getMin().woCompare(tagRange.min)) { @@ -202,7 +205,7 @@ void getSplitCandidatesToEnforceTagRanges(const ChunkManager* cm, if (!tagRange.max.woCompare(globalMax)) continue; - const auto chunkAtZoneMax = cm->findIntersectingChunkWithSimpleCollation(tagRange.max); + const auto chunkAtZoneMax = cm.findIntersectingChunkWithSimpleCollation(tagRange.max); // We need to check that both the chunk's minKey does not match the zone's max and also that // the max is not equal, which would only happen in the case of the zone ending in MaxKey. @@ -221,11 +224,11 @@ void getSplitCandidatesToEnforceTagRanges(const ChunkManager* cm, * splitCandidates with chunk and splitPoint pairs for chunks that need to split. */ void getSplitCandidatesForSessionsCollection(OperationContext* opCtx, - const ChunkManager* cm, + const ChunkManager& cm, SplitCandidatesBuffer* splitCandidates) { const auto minNumChunks = minNumChunksForSessionsCollection.load(); - if (cm->numChunks() >= minNumChunks) { + if (cm.numChunks() >= minNumChunks) { return; } @@ -256,7 +259,7 @@ void getSplitCandidatesForSessionsCollection(OperationContext* opCtx, // For each split point, find a chunk that needs to be split. for (auto& splitPoint : splitPoints) { - const auto chunkAtSplitPoint = cm->findIntersectingChunkWithSimpleCollation(splitPoint); + const auto chunkAtSplitPoint = cm.findIntersectingChunkWithSimpleCollation(splitPoint); invariant(chunkAtSplitPoint.getMax().woCompare(splitPoint) > 0); if (chunkAtSplitPoint.getMin().woCompare(splitPoint)) { @@ -458,16 +461,17 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* op const auto& shardStats = shardStatsStatus.getValue(); + const auto& nss = chunk.getNS(); + auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - chunk.getNS()); + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); if (!routingInfoStatus.isOK()) { return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -485,18 +489,19 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt return shardStatsStatus.getStatus(); } + const auto& nss = chunk.getNS(); + auto shardStats = std::move(shardStatsStatus.getValue()); auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - chunk.getNS()); + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); if (!routingInfoStatus.isOK()) { return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -527,9 +532,9 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidate return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -537,7 +542,7 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidate const DistributionStatus& distribution = collInfoStatus.getValue(); // Accumulate split points for the same chunk together - SplitCandidatesBuffer splitCandidates(nss, cm->getVersion()); + SplitCandidatesBuffer splitCandidates(nss, cm.getVersion()); if (nss == NamespaceString::kLogicalSessionsNamespace) { if (!distribution.tags().empty()) { @@ -565,11 +570,11 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi return routingInfoStatus.getStatus(); } - const auto cm = routingInfoStatus.getValue().cm(); + const auto& cm = routingInfoStatus.getValue(); - const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); + const auto& shardKeyPattern = cm.getShardKeyPattern().getKeyPattern(); - const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, *cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, nss, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -579,7 +584,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi for (const auto& tagRangeEntry : distribution.tagRanges()) { const auto& tagRange = tagRangeEntry.second; - const auto chunkAtZoneMin = cm->findIntersectingChunkWithSimpleCollation(tagRange.min); + const auto chunkAtZoneMin = cm.findIntersectingChunkWithSimpleCollation(tagRange.min); if (chunkAtZoneMin.getMin().woCompare(tagRange.min)) { return {ErrorCodes::IllegalOperation, @@ -595,7 +600,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi if (!tagRange.max.woCompare(shardKeyPattern.globalMax())) continue; - const auto chunkAtZoneMax = cm->findIntersectingChunkWithSimpleCollation(tagRange.max); + const auto chunkAtZoneMax = cm.findIntersectingChunkWithSimpleCollation(tagRange.max); // We need to check that both the chunk's minKey does not match the zone's max and also that // the max is not equal, which would only happen in the case of the zone ending in MaxKey. diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 923cc9d9151..3169b1fb440 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -182,18 +182,15 @@ Status MigrationManager::executeManualMigration( &scopedMigrationRequests) ->get(); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, migrateInfo.nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); + auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( + opCtx, migrateInfo.nss); + if (!swCM.isOK()) { + return swCM.getStatus(); } - auto& routingInfo = routingInfoStatus.getValue(); - - const auto chunk = - routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); + const auto& cm = swCM.getValue(); + const auto chunk = cm.findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); Status commandStatus = remoteCommandResponse.status; @@ -333,10 +330,9 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss); - if (!routingInfoStatus.isOK()) { + auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( + opCtx, nss); + if (!swCM.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer // throughout. Abort migration recovery. @@ -345,11 +341,11 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, "recovery. Abandoning recovery: {error}", "Unable to reload chunk metadata for collection during balancer recovery", "namespace"_attr = nss, - "error"_attr = redact(routingInfoStatus.getStatus())); + "error"_attr = redact(swCM.getStatus())); return; } - auto& routingInfo = routingInfoStatus.getValue(); + const auto& cm = swCM.getValue(); int scheduledMigrations = 0; @@ -359,8 +355,7 @@ void MigrationManager::finishRecovery(OperationContext* opCtx, auto waitForDelete = migrationType.getWaitForDelete(); migrateInfos.pop_front(); - const auto chunk = - routingInfo.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); + const auto chunk = cm.findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); if (chunk.getShardId() != migrationInfo.from) { // Chunk is no longer on the source shard specified by this migration. Erase the diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index 90c1628ee20..37a6f797064 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -111,14 +111,14 @@ Status splitChunkAtMultiplePoints(OperationContext* opCtx, */ void moveChunk(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { // We need to have the most up-to-date view of the chunk we are about to move. - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::NamespaceNotSharded, "Could not move chunk. Collection is no longer sharded", - routingInfo.cm()); + cm.isSharded()); - const auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(minKey); + const auto suggestedChunk = cm.findIntersectingChunkWithSimpleCollation(minKey); ChunkType chunkToMove; chunkToMove.setNS(nss); @@ -297,18 +297,17 @@ void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSp try { const auto opCtx = cc().makeOperationContext(); - const auto routingInfo = uassertStatusOK( - Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss)); - const auto cm = routingInfo.cm(); + const auto cm = uassertStatusOK( + Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss)); uassert(ErrorCodes::NamespaceNotSharded, "Could not split chunk. Collection is no longer sharded", - cm); + cm.isSharded()); // Best effort checks that the chunk we're splitting hasn't changed bounds or moved shards // since the auto split task was scheduled. Best effort because the chunk metadata may // change after this point. - const auto chunk = cm->findIntersectingChunkWithSimpleCollation(min); + const auto chunk = cm.findIntersectingChunkWithSimpleCollation(min); uassert(4860100, "Chunk to be auto split has different boundaries than when the split was initiated", chunk.getRange() == ChunkRange(min, max)); @@ -316,7 +315,7 @@ void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSp "Chunk to be auto split isn't owned by this shard", ShardingState::get(opCtx.get())->shardId() == chunk.getShardId()); - const auto& shardKeyPattern = cm->getShardKeyPattern(); + const auto& shardKeyPattern = cm.getShardKeyPattern(); const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration(); // Ensure we have the most up-to-date balancer configuration @@ -395,7 +394,7 @@ void ChunkSplitter::_runAutosplit(std::shared_ptr<ChunkSplitStateDriver> chunkSp chunk.getShardId(), nss, shardKeyPattern, - cm->getVersion(), + cm.getVersion(), chunk.getRange(), splitPoints)); chunkSplitStateDriver->commitSplit(); diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index c275530b384..7da06febea4 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -96,7 +96,7 @@ protected: return std::vector<ChunkType>{chunk1, chunk2, chunk3, chunk4}; }()); - ChunkManager cm(rt, boost::none); + ChunkManager cm(ShardId("0"), DatabaseVersion(UUID::gen(), 1), rt, boost::none); ASSERT_EQ(4, cm.numChunks()); { diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index eaa5d5bcf79..1cb96520d96 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -82,7 +82,8 @@ std::unique_ptr<CollectionMetadata> makeCollectionMetadataImpl( UUID uuid(UUID::gen()); auto rt = RoutingTableHistory::makeNew(kNss, uuid, shardKeyPattern, nullptr, false, epoch, allChunks); - return std::make_unique<CollectionMetadata>(ChunkManager(rt, kChunkManager), kThisShard); + return std::make_unique<CollectionMetadata>( + ChunkManager(kThisShard, DatabaseVersion(UUID::gen(), 1), rt, kChunkManager), kThisShard); } struct ConstructedRangeMap : public RangeMap { diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index 58415ae97eb..77fc9616d93 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -52,6 +52,8 @@ CollectionMetadata makeShardedMetadata(OperationContext* opCtx, UUID uuid = UUID auto range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); auto chunk = ChunkType(kTestNss, std::move(range), ChunkVersion(1, 0, epoch), ShardId("other")); ChunkManager cm( + ShardId("0"), + DatabaseVersion(UUID::gen(), 1), RoutingTableHistory::makeNew( kTestNss, uuid, kShardKeyPattern, nullptr, false, epoch, {std::move(chunk)}), boost::none); diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp index 8710cb63fcd..977fd9640b0 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.cpp +++ b/src/mongo/db/s/config/config_server_test_fixture.cpp @@ -68,7 +68,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/database_version_helpers.h" -#include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_id.h" diff --git a/src/mongo/db/s/config/configsvr_add_shard_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_command.cpp index 058a43b18de..c7003caddb0 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_command.cpp @@ -44,15 +44,12 @@ #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_shard.h" -#include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_request_type.h" #include "mongo/util/str.h" namespace mongo { namespace { -using std::string; - const long long kMaxSizeMBDefault = 0; /** @@ -122,7 +119,7 @@ public: parsedRequest.hasMaxSize() ? parsedRequest.getMaxSize() : kMaxSizeMBDefault); - StatusWith<string> addShardResult = ShardingCatalogManager::get(opCtx)->addShard( + StatusWith<std::string> addShardResult = ShardingCatalogManager::get(opCtx)->addShard( opCtx, parsedRequest.hasName() ? &parsedRequest.getName() : nullptr, parsedRequest.getConnString(), diff --git a/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp index 42a9eef532c..217511db4b7 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp @@ -38,7 +38,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/config/sharding_catalog_manager.h" -#include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_to_zone_request_type.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index b9bc5987b04..7ecbb51e33f 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -91,7 +91,7 @@ public: validateZones(request().getZones().get(), authoritativeTags); } - const auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); @@ -112,8 +112,7 @@ public: chunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern()); numInitialChunks = chunks.size(); } else { - numInitialChunks = - request().getNumInitialChunks().get_value_or(routingInfo.cm()->numChunks()); + numInitialChunks = request().getNumInitialChunks().get_value_or(cm.numChunks()); } } diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 7f83dfaca1d..22a0e23282d 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -42,13 +42,13 @@ #include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/logv2/log.h" -#include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" #include "mongo/util/time_support.h" namespace mongo { namespace { + using TaskExecutor = executor::TaskExecutor; using CallbackArgs = TaskExecutor::CallbackArgs; @@ -136,9 +136,9 @@ std::shared_ptr<ScopedCollectionDescription::Impl> MetadataManager::getActiveMet lg, shared_from_this(), std::move(activeMetadataTracker)); } - auto chunkManager = activeMetadata->getChunkManager(); - ChunkManager chunkManagerAtClusterTime = - ChunkManager(chunkManager->getRoutingHistory(), atClusterTime->asTimestamp()); + auto cm = activeMetadata->getChunkManager(); + ChunkManager chunkManagerAtClusterTime = ChunkManager( + cm->dbPrimary(), cm->dbVersion(), cm->getRoutingHistory(), atClusterTime->asTimestamp()); class MetadataAtTimestamp : public ScopedCollectionDescription::Impl { public: diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 6ec878a9bc1..d0eb10912ae 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -87,7 +87,8 @@ protected: epoch, {ChunkType{kNss, range, ChunkVersion(1, 0, epoch), kOtherShard}}); - return CollectionMetadata(ChunkManager(rt, boost::none), kThisShard); + return CollectionMetadata( + ChunkManager(kThisShard, DatabaseVersion(UUID::gen(), 1), rt, boost::none), kThisShard); } /** @@ -129,7 +130,8 @@ protected: auto rt = cm->getRoutingHistory()->makeUpdated(splitChunks); - return CollectionMetadata(ChunkManager(rt, boost::none), kThisShard); + return CollectionMetadata(ChunkManager(cm->dbPrimary(), cm->dbVersion(), rt, boost::none), + kThisShard); } static CollectionMetadata cloneMetadataMinusChunk(const CollectionMetadata& metadata, @@ -150,7 +152,8 @@ protected: auto rt = cm->getRoutingHistory()->makeUpdated( {ChunkType(kNss, ChunkRange(minKey, maxKey), chunkVersion, kOtherShard)}); - return CollectionMetadata(ChunkManager(rt, boost::none), kThisShard); + return CollectionMetadata(ChunkManager(cm->dbPrimary(), cm->dbVersion(), rt, boost::none), + kThisShard); } std::shared_ptr<MetadataManager> _manager; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 7d8b0d1c6f3..fe92a9dada3 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -164,7 +164,11 @@ protected: CollectionShardingRuntime::get(operationContext(), kNss) ->setFilteringMetadata( operationContext(), - CollectionMetadata(ChunkManager(rt, boost::none), ShardId("dummyShardId"))); + CollectionMetadata(ChunkManager(ShardId("dummyShardId"), + DatabaseVersion(UUID::gen(), 1), + rt, + boost::none), + ShardId("dummyShardId"))); }(); _client->createIndex(kNss.ns(), kShardKeyPattern); diff --git a/src/mongo/db/s/move_timing_helper.cpp b/src/mongo/db/s/move_timing_helper.cpp index dab096d5511..f3270f67986 100644 --- a/src/mongo/db/s/move_timing_helper.cpp +++ b/src/mongo/db/s/move_timing_helper.cpp @@ -37,7 +37,7 @@ #include "mongo/db/curop.h" #include "mongo/db/s/sharding_logging.h" #include "mongo/logv2/log.h" -#include "mongo/s/grid.h" +#include "mongo/s/catalog/sharding_catalog_client.h" namespace mongo { diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 5e7a4167185..521c4bf7b4f 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -55,7 +55,7 @@ const auto getIsMigrating = OperationContext::declareDecoration<bool>(); * restarted. */ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, - CollectionMetadata const& metadata, + const CollectionMetadata& metadata, const BSONObj& doc) { const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); if (!atClusterTime) @@ -64,8 +64,9 @@ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, auto shardKey = metadata.getShardKeyPattern().extractShardKeyFromDoc(doc); // We can assume the simple collation because shard keys do not support non-simple collations. - ChunkManager chunkManagerAtClusterTime(metadata.getChunkManager()->getRoutingHistory(), - atClusterTime->asTimestamp()); + auto cm = metadata.getChunkManager(); + ChunkManager chunkManagerAtClusterTime( + cm->dbPrimary(), cm->dbVersion(), cm->getRoutingHistory(), atClusterTime->asTimestamp()); auto chunk = chunkManagerAtClusterTime.findIntersectingChunkWithSimpleCollation(shardKey); // Throws if the chunk has moved since the timestamp of the running transaction's atClusterTime diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp index 0d8597e9e98..a282455af15 100644 --- a/src/mongo/db/s/op_observer_sharding_test.cpp +++ b/src/mongo/db/s/op_observer_sharding_test.cpp @@ -66,7 +66,9 @@ CollectionMetadata makeAMetadata(BSONObj const& keyPattern) { auto rt = RoutingTableHistory::makeNew( kTestNss, UUID::gen(), KeyPattern(keyPattern), nullptr, false, epoch, {std::move(chunk)}); - return CollectionMetadata(ChunkManager(rt, Timestamp(100, 0)), ShardId("this")); + return CollectionMetadata( + ChunkManager(ShardId("this"), DatabaseVersion(UUID::gen(), 1), rt, Timestamp(100, 0)), + ShardId("this")); } class DeleteStateTest : public ShardServerTestFixture {}; diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index a8bdd46cf56..4af4b139430 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -104,9 +104,12 @@ public: AutoGetDb autoDb(operationContext(), kNss.db(), MODE_IX); Lock::CollectionLock collLock(operationContext(), kNss, MODE_IX); CollectionShardingRuntime::get(operationContext(), kNss) - ->setFilteringMetadata( - operationContext(), - CollectionMetadata(ChunkManager(rt, boost::none), ShardId("dummyShardId"))); + ->setFilteringMetadata(operationContext(), + CollectionMetadata(ChunkManager(ShardId("dummyShardId"), + DatabaseVersion(UUID::gen(), 1), + rt, + boost::none), + ShardId("dummyShardId"))); } UUID uuid() const { diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 1702c48ff8c..ab219ed34fb 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -321,14 +321,14 @@ CollectionMetadata forceGetCurrentMetadata(OperationContext* opCtx, const Namesp auto* const shardingState = ShardingState::get(opCtx); invariant(shardingState->canAcceptShardedCommands()); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss, true)); - if (!routingInfo.cm()) { + if (!cm.isSharded()) { return CollectionMetadata(); } - return CollectionMetadata(*routingInfo.cm(), shardingState->shardId()); + return CollectionMetadata(cm, shardingState->shardId()); } ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, @@ -344,13 +344,12 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, auto* const shardingState = ShardingState::get(opCtx); invariant(shardingState->canAcceptShardedCommands()); - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh( opCtx, nss, forceRefreshFromThisThread)); - auto cm = routingInfo.cm(); - if (!cm) { - // No chunk manager, so unsharded. Avoid using AutoGetCollection() as it returns the + if (!cm.isSharded()) { + // The collection is not sharded. Avoid using AutoGetCollection() as it returns the // InvalidViewDefinition error code if an invalid view is in the 'system.views' collection. AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); @@ -373,8 +372,8 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, if (optMetadata) { const auto& metadata = *optMetadata; if (metadata.isSharded() && - metadata.getCollVersion().epoch() == cm->getVersion().epoch() && - metadata.getCollVersion() >= cm->getVersion()) { + metadata.getCollVersion().epoch() == cm.getVersion().epoch() && + metadata.getCollVersion() >= cm.getVersion()) { LOGV2_DEBUG( 22063, 1, @@ -384,7 +383,7 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, "metadata", "namespace"_attr = nss, "latestCollectionVersion"_attr = metadata.getCollVersion(), - "refreshedCollectionVersion"_attr = cm->getVersion()); + "refreshedCollectionVersion"_attr = cm.getVersion()); return metadata.getShardVersion(); } } @@ -404,8 +403,8 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, if (optMetadata) { const auto& metadata = *optMetadata; if (metadata.isSharded() && - metadata.getCollVersion().epoch() == cm->getVersion().epoch() && - metadata.getCollVersion() >= cm->getVersion()) { + metadata.getCollVersion().epoch() == cm.getVersion().epoch() && + metadata.getCollVersion() >= cm.getVersion()) { LOGV2_DEBUG( 22064, 1, @@ -415,13 +414,13 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, "metadata", "namespace"_attr = nss, "latestCollectionVersion"_attr = metadata.getCollVersion(), - "refreshedCollectionVersion"_attr = cm->getVersion()); + "refreshedCollectionVersion"_attr = cm.getVersion()); return metadata.getShardVersion(); } } } - CollectionMetadata metadata(*cm, shardingState->shardId()); + CollectionMetadata metadata(cm, shardingState->shardId()); const auto newShardVersion = metadata.getShardVersion(); csr->setFilteringMetadata(opCtx, std::move(metadata)); diff --git a/src/mongo/db/s/shard_key_util.cpp b/src/mongo/db/s/shard_key_util.cpp index 2cae3071624..e216f9f682d 100644 --- a/src/mongo/db/s/shard_key_util.cpp +++ b/src/mongo/db/s/shard_key_util.cpp @@ -231,16 +231,16 @@ void ValidationBehaviorsShardCollection::createShardKeyIndex( ValidationBehaviorsRefineShardKey::ValidationBehaviorsRefineShardKey(OperationContext* opCtx, const NamespaceString& nss) : _opCtx(opCtx) { - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "refineCollectionShardKey namespace " << nss.toString() << " is not sharded", - routingInfo.cm()); - const auto minKeyShardId = routingInfo.cm()->getMinKeyShardIdWithSimpleCollation(); + cm.isSharded()); + const auto minKeyShardId = cm.getMinKeyShardIdWithSimpleCollation(); _indexShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId)); - _cm.emplace(*routingInfo.cm()); + _cm = std::move(cm); } std::vector<BSONObj> ValidationBehaviorsRefineShardKey::loadIndexes( diff --git a/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp b/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp index 71bf6d40005..7a9c6636d9b 100644 --- a/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp +++ b/src/mongo/s/build_versioned_requests_for_targeted_shards_test.cpp @@ -51,7 +51,7 @@ protected: * the expected vector. */ void runBuildVersionedRequestsExpect( - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const BSONObj& query, @@ -59,7 +59,7 @@ protected: const std::vector<AsyncRequestsSender::Request>& expectedRequests) { const auto actualRequests = buildVersionedRequestsForTargetedShards( - operationContext(), kNss, routingInfo, shardsToSkip, cmdObj, query, collation); + operationContext(), kNss, cm, shardsToSkip, cmdObj, query, collation); ASSERT_EQ(expectedRequests.size(), actualRequests.size()); _assertShardIdsMatch(expectedRequests, actualRequests); @@ -112,10 +112,10 @@ TEST_F(BuildVersionedRequestsForTargetedShardsTest, ReturnPrimaryShardForUnshard expectGetDatabaseUnsharded(); expectGetCollectionUnsharded(); - auto routingInfo = future.default_timed_get(); + auto cm = future.default_timed_get(); AsyncRequestsSender::Request expectedRequest{ShardId(_shards[0].getName()), {}}; - runBuildVersionedRequestsExpect(*routingInfo, {}, {}, {}, {}, {expectedRequest}); + runBuildVersionedRequestsExpect(*cm, {}, {}, {}, {}, {expectedRequest}); } TEST_F(BuildVersionedRequestsForTargetedShardsTest, @@ -125,9 +125,9 @@ TEST_F(BuildVersionedRequestsForTargetedShardsTest, expectGetDatabaseUnsharded(); expectGetCollectionUnsharded(); - auto routingInfo = future.default_timed_get(); + auto cm = future.default_timed_get(); - runBuildVersionedRequestsExpect(*routingInfo, {ShardId(_shards[0].getName())}, {}, {}, {}, {}); + runBuildVersionedRequestsExpect(*cm, {ShardId(_shards[0].getName())}, {}, {}, {}, {}); } } // namespace mongo diff --git a/src/mongo/s/catalog/type_shard.cpp b/src/mongo/s/catalog/type_shard.cpp index 325fc4c1c2c..36e8e931536 100644 --- a/src/mongo/s/catalog/type_shard.cpp +++ b/src/mongo/s/catalog/type_shard.cpp @@ -35,7 +35,6 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" -#include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index fd4ef9f5d06..c9bc5853f62 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -180,8 +180,8 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx } } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( - OperationContext* opCtx, const NamespaceString& nss) { +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) { return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo; } @@ -198,8 +198,9 @@ CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationCon } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoAt( - OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime) { +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + Timestamp atClusterTime) { return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo; } @@ -276,12 +277,10 @@ CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt( continue; } - return {CachedCollectionRoutingInfo(nss, - dbInfo, - collEntry->routingInfo - ? boost::optional<ChunkManager>(ChunkManager( - collEntry->routingInfo, atClusterTime)) - : boost::none), + return {ChunkManager(dbInfo.primaryId(), + dbInfo.databaseVersion(), + collEntry->routingInfo, + atClusterTime), refreshActionTaken}; } } @@ -293,7 +292,7 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon return getDatabase(opCtx, dbName); } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh( +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) { auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss); // We want to ensure that we don't join an in-progress refresh because that @@ -308,15 +307,18 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWi return refreshResult.statusWithInfo; } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( +StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss) { - auto routingInfoStatus = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo; - if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().cm()) { + auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo; + if (!swRoutingInfo.isOK()) + return swRoutingInfo; + + auto cri(std::move(swRoutingInfo.getValue())); + if (!cri.isSharded()) return {ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << nss.ns() << " is not sharded."}; - } - return routingInfoStatus; + return cri; } void CatalogCache::onStaleDatabaseVersion(const StringData dbName, @@ -333,48 +335,6 @@ void CatalogCache::onStaleDatabaseVersion(const StringData dbName, } } -void CatalogCache::onStaleShardVersion(CachedCollectionRoutingInfo&& ccriToInvalidate, - const ShardId& staleShardId) { - _stats.countStaleConfigErrors.addAndFetch(1); - - // Ensure the move constructor of CachedCollectionRoutingInfo is invoked in order to clear the - // input argument so it can't be used anymore - auto ccri(ccriToInvalidate); - - if (!ccri._cm) { - // We received StaleShardVersion for a collection we thought was unsharded. The collection - // must have become sharded. - onEpochChange(ccri._nss); - return; - } - - // We received StaleShardVersion for a collection we thought was sharded. Either a migration - // occurred to or from a shard we contacted, or the collection was dropped. - stdx::lock_guard<Latch> lg(_mutex); - - const auto nss = ccri._cm->getns(); - const auto itDb = _collectionsByDb.find(nss.db()); - if (itDb == _collectionsByDb.end()) { - // The database was dropped. - return; - } - - auto itColl = itDb->second.find(nss.ns()); - if (itColl == itDb->second.end()) { - // The collection was dropped. - } else if (itColl->second->needsRefresh && itColl->second->epochHasChanged) { - // If the epoch has changed, this implies that all routing requests have already been - // marked to block behind the next catalog cache refresh. We do not need to mark the shard - // as stale in this case. - return; - } else if (itColl->second->routingInfo->getVersion() == ccri._cm->getVersion()) { - // If the versions match, the last version of the routing information that we used is no - // longer valid, so trigger a refresh. - itColl->second->needsRefresh = true; - itColl->second->routingInfo->setShardStale(staleShardId); - } -} - void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx, bool shouldBlock) { if (gEnableFinerGrainedCatalogCacheRefresh) { @@ -887,9 +847,4 @@ std::string ComparableChunkVersion::toString() const { return toBSON().toString(); } -CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(NamespaceString nss, - CachedDatabaseInfo db, - boost::optional<ChunkManager> cm) - : _nss(std::move(nss)), _db(std::move(db)), _cm(std::move(cm)) {} - } // namespace mongo diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index ca06eb85e6b..a957189183a 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -29,17 +29,13 @@ #pragma once -#include <memory> - #include "mongo/base/string_data.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/chunk_manager.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" -#include "mongo/s/database_version_gen.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/concurrency/with_lock.h" @@ -50,7 +46,6 @@ namespace mongo { class BSONObjBuilder; class CachedDatabaseInfo; -class CachedCollectionRoutingInfo; class OperationContext; static constexpr int kMaxNumStaleVersionRetries = 10; @@ -242,6 +237,7 @@ public: private: friend class CatalogCache; + CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard); DatabaseType _dbt; @@ -249,45 +245,6 @@ private: }; /** - * Constructed exclusively by the CatalogCache. - * - * This RoutingInfo can be considered a "package" of routing info for the database and for the - * collection. Once unsharded collections are treated as sharded collections with a single chunk, - * they will also have a ChunkManager with a "chunk distribution." At that point, this "package" can - * be dismantled: routing for commands that route by database can directly retrieve the - * CachedDatabaseInfo, while routing for commands that route by collection can directly retrieve the - * ChunkManager. - */ -class CachedCollectionRoutingInfo { -public: - CachedDatabaseInfo db() const { - return _db; - } - - const ChunkManager* cm() const { - return _cm.get_ptr(); - } - -private: - friend class CatalogCache; - friend class CachedDatabaseInfo; - - CachedCollectionRoutingInfo(NamespaceString nss, - CachedDatabaseInfo db, - boost::optional<ChunkManager> cm); - - NamespaceString _nss; - - // Copy of the database's cached info. - CachedDatabaseInfo _db; - - // Shared reference to the collection's cached chunk distribution if sharded, otherwise - // boost::none. This is a shared reference rather than a copy because the chunk distribution can - // be large. - boost::optional<ChunkManager> _cm; -}; - -/** * This is the root of the "read-only" hierarchy of cached catalog metadata. It is read only * in the sense that it only reads from the persistent store, but never writes to it. Instead * writes happen through the ShardingCatalogManager and the cache hierarchy needs to be invalidated. @@ -318,9 +275,9 @@ public: * If the given atClusterTime is so far in the past that it is not possible to construct routing * info, returns a StaleClusterTime error. */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoAt(OperationContext* opCtx, - const NamespaceString& nss, - Timestamp atClusterTime); + StatusWith<ChunkManager> getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + Timestamp atClusterTime); /** * Same as the getCollectionRoutingInfoAt call above, but returns the latest known routing @@ -330,8 +287,8 @@ public: * guaranteed to never return StaleClusterTime, because the latest routing information should * always be available. */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss); + StatusWith<ChunkManager> getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss); /** * Same as getDatbase above, but in addition forces the database entry to be refreshed. @@ -352,7 +309,7 @@ public: * collection version to decide when a refresh is necessary and provide * proper causal consistency */ - StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoWithRefresh( + StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread = false); @@ -361,8 +318,8 @@ public: * Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a * NamespaceNotSharded error if the collection is not sharded. */ - StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, const NamespaceString& nss); + StatusWith<ChunkManager> getShardedCollectionRoutingInfoWithRefresh(OperationContext* opCtx, + const NamespaceString& nss); /** * Advances the version in the cache for the given database. @@ -376,14 +333,6 @@ public: const boost::optional<DatabaseVersion>& wantedVersion); /** - * Non-blocking method that marks the current cached collection entry as needing refresh if its - * collectionVersion matches the input's ChunkManager's collectionVersion. - * - * To be called if using the input routing info caused a StaleShardVersion to be received. - */ - void onStaleShardVersion(CachedCollectionRoutingInfo&&, const ShardId& staleShardId); - - /** * Gets whether this operation should block behind a catalog cache refresh. */ static bool getOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx); @@ -470,7 +419,6 @@ public: private: // Make the cache entries friends so they can access the private classes below friend class CachedDatabaseInfo; - friend class CachedCollectionRoutingInfo; /** * Cache entry describing a collection. @@ -573,7 +521,7 @@ private: */ struct RefreshResult { // Status containing result of refresh - StatusWith<CachedCollectionRoutingInfo> statusWithInfo; + StatusWith<ChunkManager> statusWithInfo; RefreshAction actionTaken; }; diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp index de1fb9e7388..27a62bd9749 100644 --- a/src/mongo/s/catalog_cache_refresh_test.cpp +++ b/src/mongo/s/catalog_cache_refresh_test.cpp @@ -117,11 +117,9 @@ TEST_F(CatalogCacheRefreshTest, FullLoad) { chunk4.toConfigBSON()}; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(4, cm->numChunks()); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(4, cm.numChunks()); } TEST_F(CatalogCacheRefreshTest, NoLoadIfShardNotMarkedStaleInOperationContext) { @@ -131,10 +129,9 @@ TEST_F(CatalogCacheRefreshTest, NoLoadIfShardNotMarkedStaleInOperationContext) { ASSERT_EQ(2, initialRoutingInfo.numChunks()); auto futureNoRefresh = scheduleRoutingInfoUnforcedRefresh(kNss); - auto routingInfo = futureNoRefresh.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - ASSERT_EQ(2, cm->numChunks()); + auto cm = *futureNoRefresh.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); } class MockLockerAlwaysReportsToBeLocked : public LockerNoop { @@ -160,12 +157,8 @@ TEST_F(CatalogCacheRefreshTest, DatabaseNotFound) { expectFindSendBSONObjVector(kConfigHostAndPort, {}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - - FAIL(str::stream() << "Returning no database did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + auto cm = *future.default_timed_get(); + FAIL(str::stream() << "Returning no database did not fail and returned " << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NamespaceNotFound, ex.code()); } @@ -181,12 +174,9 @@ TEST_F(CatalogCacheRefreshTest, DatabaseBSONCorrupted) { << "This value should not be in a database config document")}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning corrupted database entry did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code()); } @@ -200,10 +190,9 @@ TEST_F(CatalogCacheRefreshTest, CollectionNotFound) { // Return an empty collection expectFindSendBSONObjVector(kConfigHostAndPort, {}); - auto routingInfo = future.default_timed_get(); - ASSERT(!routingInfo->cm()); - ASSERT(routingInfo->db().primary()); - ASSERT_EQ(ShardId{"0"}, routingInfo->db().primaryId()); + auto cm = *future.default_timed_get(); + ASSERT(!cm.isSharded()); + ASSERT_EQ(ShardId{"0"}, cm.dbPrimary()); } TEST_F(CatalogCacheRefreshTest, CollectionBSONCorrupted) { @@ -218,12 +207,9 @@ TEST_F(CatalogCacheRefreshTest, CollectionBSONCorrupted) { << "This value should not be in a collection config document")}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning corrupted collection entry did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code()); } @@ -248,12 +234,9 @@ TEST_F(CatalogCacheRefreshTest, FullLoadNoChunksFound) { expectFindSendBSONObjVector(kConfigHostAndPort, {}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning no chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -280,12 +263,9 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadNoChunksFound) { expectFindSendBSONObjVector(kConfigHostAndPort, {}); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning no chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -313,12 +293,9 @@ TEST_F(CatalogCacheRefreshTest, ChunksBSONCorrupted) { }()); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning no chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code()); } @@ -370,13 +347,10 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithLowestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -428,13 +402,10 @@ TEST_F(CatalogCacheRefreshTest, FullLoadMissingChunkWithHighestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -488,13 +459,10 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -547,13 +515,10 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) { expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL( str::stream() << "Returning incomplete chunks for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -595,13 +560,10 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) { expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks); try { - auto routingInfo = future.default_timed_get(); - auto cm = routingInfo->cm(); - auto primary = routingInfo->db().primary(); - + auto cm = *future.default_timed_get(); FAIL(str::stream() << "Returning chunks with different epoch for collection did not fail and returned " - << (cm ? cm->toString() : routingInfo->db().primaryId().toString())); + << cm.toString()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code()); } @@ -684,14 +646,12 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()}; }); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(3, cm->numChunks()); - ASSERT_EQ(newVersion, cm->getVersion()); - ASSERT_EQ(ChunkVersion(5, 1, newVersion.epoch()), cm->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(5, 2, newVersion.epoch()), cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(3, cm.numChunks()); + ASSERT_EQ(newVersion, cm.getVersion()); + ASSERT_EQ(ChunkVersion(5, 1, newVersion.epoch()), cm.getVersion({"0"})); + ASSERT_EQ(ChunkVersion(5, 2, newVersion.epoch()), cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { @@ -733,14 +693,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(2, cm->numChunks()); - ASSERT_EQ(newVersion, cm->getVersion()); - ASSERT_EQ(ChunkVersion(1, 0, newVersion.epoch()), cm->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(1, 1, newVersion.epoch()), cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); + ASSERT_EQ(newVersion, cm.getVersion()); + ASSERT_EQ(ChunkVersion(1, 0, newVersion.epoch()), cm.getVersion({"0"})); + ASSERT_EQ(ChunkVersion(1, 1, newVersion.epoch()), cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { @@ -778,14 +736,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(2, cm->numChunks()); - ASSERT_EQ(version, cm->getVersion()); - ASSERT_EQ(version, cm->getVersion({"0"})); - ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); + ASSERT_EQ(version, cm.getVersion()); + ASSERT_EQ(version, cm.getVersion({"0"})); + ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMove) { @@ -819,14 +775,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMove) { return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(2, cm->numChunks()); - ASSERT_EQ(version, cm->getVersion()); - ASSERT_EQ(version, cm->getVersion({"0"})); - ASSERT_EQ(expectedDestShardVersion, cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(2, cm.numChunks()); + ASSERT_EQ(version, cm.getVersion()); + ASSERT_EQ(version, cm.getVersion({"0"})); + ASSERT_EQ(expectedDestShardVersion, cm.getVersion({"1"})); } TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunk) { @@ -856,14 +810,12 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunk) { return std::vector<BSONObj>{chunk1.toConfigBSON()}; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - auto cm = routingInfo->cm(); - - ASSERT_EQ(1, cm->numChunks()); - ASSERT_EQ(version, cm->getVersion()); - ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm->getVersion({"0"})); - ASSERT_EQ(version, cm->getVersion({"1"})); + auto cm = *future.default_timed_get(); + ASSERT(cm.isSharded()); + ASSERT_EQ(1, cm.numChunks()); + ASSERT_EQ(version, cm.getVersion()); + ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm.getVersion({"0"})); + ASSERT_EQ(version, cm.getVersion({"1"})); } } // namespace diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index a3254db64ab..ba9bb4a4ddb 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -59,7 +59,7 @@ void CatalogCacheTestFixture::setUp() { CollatorFactoryInterface::set(getServiceContext(), std::make_unique<CollatorFactoryMock>()); } -executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> +executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> CatalogCacheTestFixture::scheduleRoutingInfoForcedRefresh(const NamespaceString& nss) { return launchAsync([this, nss] { auto client = getServiceContext()->makeClient("Test"); @@ -70,13 +70,13 @@ CatalogCacheTestFixture::scheduleRoutingInfoForcedRefresh(const NamespaceString& }); } -executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> +executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> CatalogCacheTestFixture::scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss) { return launchAsync([this, nss] { auto client = getServiceContext()->makeClient("Test"); auto const catalogCache = Grid::get(getServiceContext())->catalogCache(); - return boost::make_optional( + return boost::optional<ChunkManager>( uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss))); }); } @@ -160,11 +160,7 @@ ChunkManager CatalogCacheTestFixture::makeChunkManager( expectFindSendBSONObjVector(kConfigHostAndPort, {collectionBSON}); expectFindSendBSONObjVector(kConfigHostAndPort, initialChunks); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); - ASSERT(routingInfo->db().primary()); - - return *routingInfo->cm(); + return *future.default_timed_get(); } void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss, std::string shardId) { @@ -188,13 +184,13 @@ void CatalogCacheTestFixture::expectGetCollection(NamespaceString nss, }()); } -CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards( +ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards( NamespaceString nss) { return loadRoutingTableWithTwoChunksAndTwoShardsImpl(nss, BSON("_id" << 1)); } -CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsHash( +ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsHash( NamespaceString nss) { return loadRoutingTableWithTwoChunksAndTwoShardsImpl(nss, @@ -202,7 +198,7 @@ CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChun << "hashed")); } -CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsImpl( +ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsImpl( NamespaceString nss, const BSONObj& shardKey) { const OID epoch = OID::gen(); const ShardKeyPattern shardKeyPattern(shardKey); @@ -230,7 +226,7 @@ CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChun return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; }()); - return future.default_timed_get().get(); + return *future.default_timed_get(); } } // namespace mongo diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h index d17d81e6a93..412e2455911 100644 --- a/src/mongo/s/catalog_cache_test_fixture.h +++ b/src/mongo/s/catalog_cache_test_fixture.h @@ -39,11 +39,6 @@ namespace mongo { -class BSONObj; -class ChunkManager; -class CollatorInterface; -class ShardKeyPattern; - class CatalogCacheTestFixture : public ShardingTestFixture { protected: void setUp() override; @@ -70,7 +65,7 @@ protected: * std::future with the MSVC STL library, which requires the templated type to be default * constructible. */ - executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> + executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> scheduleRoutingInfoForcedRefresh(const NamespaceString& nss); /** @@ -84,7 +79,7 @@ protected: * std::future with the MSVC STL library, which requires the templated type to be default * constructible. */ - executor::NetworkTestEnv::FutureHandle<boost::optional<CachedCollectionRoutingInfo>> + executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss); /** @@ -99,18 +94,18 @@ protected: * Triggers a refresh for the given namespace and mocks network calls to simulate loading * metadata with two chunks: [minKey, 0) and [0, maxKey) on two shards with ids: "0" and "1". */ - CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShards(NamespaceString nss); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShards(NamespaceString nss); /** * Same as the above method but the sharding key is hashed. */ - CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShardsHash(NamespaceString nss); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShardsHash(NamespaceString nss); /** * The common implementation for any shard key. */ - CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShardsImpl( - NamespaceString nss, const BSONObj& shardKey); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString nss, + const BSONObj& shardKey); /** * Mocks network responses for loading a sharded database and collection from the config server. diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 3107a05a7a0..a159b3f0ca0 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -338,7 +338,7 @@ Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj for (BSONElement elt : shardKey) { uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard due to collation of key " - << elt.fieldNameStringData() << " for namespace " << getns(), + << elt.fieldNameStringData() << " for namespace " << _rt->nss(), !CollationIndexKey::isCollatableType(elt.type())); } } @@ -347,7 +347,7 @@ Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard using key " << shardKey - << " for namespace " << getns(), + << " for namespace " << _rt->nss(), chunkInfo && chunkInfo->containsKey(shardKey)); return Chunk(*chunkInfo, _clusterTime); @@ -370,7 +370,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e const BSONObj& query, const BSONObj& collation, std::set<ShardId>* shardIds) const { - auto qr = std::make_unique<QueryRequest>(_rt->getns()); + auto qr = std::make_unique<QueryRequest>(_rt->nss()); qr->setFilter(query); if (auto uuid = getUUID()) @@ -645,6 +645,10 @@ IndexBounds ChunkManager::collapseQuerySolution(const QuerySolutionNode* node) { return bounds; } +std::string ChunkManager::toString() const { + return _rt ? _rt->toString() : "UNSHARDED"; +} + bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other, const ShardId& shardName) const { // Return true if the shard version is the same in the two chunk managers diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index a5863b064b2..4ac16f5fa18 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -38,6 +38,7 @@ #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" +#include "mongo/s/database_version_gen.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/ticketholder.h" @@ -46,15 +47,13 @@ namespace mongo { class CanonicalQuery; struct QuerySolutionNode; -class OperationContext; class ChunkManager; struct ShardVersionTargetingInfo { - // Indicates whether the shard is stale and thus needs a catalog cache refresh. Is false by - // default. - AtomicWord<bool> isStale; + // Indicates whether the shard is stale and thus needs a catalog cache refresh + AtomicWord<bool> isStale{false}; - // Max chunk version for the shard. + // Max chunk version for the shard ChunkVersion shardVersion; ShardVersionTargetingInfo(const OID& epoch); @@ -64,9 +63,11 @@ struct ShardVersionTargetingInfo { // shard is currently marked as needing a catalog cache refresh (stale). using ShardVersionMap = stdx::unordered_map<ShardId, ShardVersionTargetingInfo, ShardId::Hasher>; -// This class serves as a Facade around how the mapping of ranges to chunks is represented. It also -// provides a simpler, high-level interface for domain specific operations without exposing the -// underlying implementation. +/** + * This class serves as a Facade around how the mapping of ranges to chunks is represented. It also + * provides a simpler, high-level interface for domain specific operations without exposing the + * underlying implementation. + */ class ChunkMap { // Vector of chunks ordered by max key. using ChunkVector = std::vector<std::shared_ptr<ChunkInfo>>; @@ -168,7 +169,7 @@ public: */ std::shared_ptr<RoutingTableHistory> makeUpdated(const std::vector<ChunkType>& changedChunks); - const NamespaceString& getns() const { + const NamespaceString& nss() const { return _nss; } @@ -261,6 +262,8 @@ public: } private: + friend class ChunkManager; + RoutingTableHistory(NamespaceString nss, boost::optional<UUID> uuid, KeyPattern shardKeyPattern, @@ -294,8 +297,6 @@ private: // Note: this declaration must not be moved before _chunkMap since it is initialized by using // the _chunkMap instance. ShardVersionMap _shardVersions; - - friend class ChunkManager; }; /** @@ -303,13 +304,37 @@ private: */ class ChunkManager { public: - ChunkManager(std::shared_ptr<RoutingTableHistory> rt, boost::optional<Timestamp> clusterTime) - : _rt(std::move(rt)), _clusterTime(std::move(clusterTime)) {} + ChunkManager(ShardId dbPrimary, + DatabaseVersion dbVersion, + std::shared_ptr<RoutingTableHistory> rt, + boost::optional<Timestamp> clusterTime) + : _dbPrimary(std::move(dbPrimary)), + _dbVersion(std::move(dbVersion)), + _rt(std::move(rt)), + _clusterTime(std::move(clusterTime)) {} + + // Methods supported on both sharded and unsharded collections + + bool isSharded() const { + return bool(_rt); + } + + const ShardId& dbPrimary() const { + return _dbPrimary; + } + + const DatabaseVersion& dbVersion() const { + return _dbVersion; + } - const NamespaceString& getns() const { - return _rt->getns(); + int numChunks() const { + return _rt ? _rt->numChunks() : 1; } + std::string toString() const; + + // Methods only supported on sharded collections (caller must check isSharded()) + const ShardKeyPattern& getShardKeyPattern() const { return _rt->getShardKeyPattern(); } @@ -345,10 +370,6 @@ public: }); } - int numChunks() const { - return _rt->numChunks(); - } - /** * Returns true if a document with the given "shardKey" is owned by the shard with the given * "shardId" in this routing table. If "shardKey" is empty returns false. If "shardKey" is not a @@ -452,10 +473,6 @@ public: return _rt->compatibleWith(*other._rt, shard); } - std::string toString() const { - return _rt->toString(); - } - bool uuidMatches(UUID uuid) const { return _rt->uuidMatches(uuid); } @@ -469,7 +486,11 @@ public: } private: + ShardId _dbPrimary; + DatabaseVersion _dbVersion; + std::shared_ptr<RoutingTableHistory> _rt; + boost::optional<Timestamp> _clusterTime; }; diff --git a/src/mongo/s/chunk_manager_query_test.cpp b/src/mongo/s/chunk_manager_query_test.cpp index 0e40ed48676..823166d96e9 100644 --- a/src/mongo/s/chunk_manager_query_test.cpp +++ b/src/mongo/s/chunk_manager_query_test.cpp @@ -523,7 +523,8 @@ TEST_F(ChunkManagerQueryTest, SnapshotQueryWithMoreShardsThanLatestMetadata) { ChunkHistory(Timestamp(1, 0), ShardId("1"))}); auto newRoutingTable = oldRoutingTable->makeUpdated({chunk1}); - ChunkManager chunkManager(newRoutingTable, Timestamp(5, 0)); + ChunkManager chunkManager( + ShardId("0"), DatabaseVersion(UUID::gen(), 1), newRoutingTable, Timestamp(5, 0)); std::set<ShardId> shardIds; chunkManager.getShardIdsForRange(BSON("x" << MINKEY), BSON("x" << MAXKEY), &shardIds); diff --git a/src/mongo/s/chunk_manager_refresh_bm.cpp b/src/mongo/s/chunk_manager_refresh_bm.cpp index a5bc812c78a..41a33a964b6 100644 --- a/src/mongo/s/chunk_manager_refresh_bm.cpp +++ b/src/mongo/s/chunk_manager_refresh_bm.cpp @@ -41,6 +41,8 @@ namespace mongo { namespace { +const NamespaceString kNss("test", "foo"); + ChunkRange getRangeForChunk(int i, int nChunks) { invariant(i >= 0); invariant(nChunks > 0); @@ -55,24 +57,27 @@ ChunkRange getRangeForChunk(int i, int nChunks) { } template <typename ShardSelectorFn> -auto makeChunkManagerWithShardSelector(int nShards, uint32_t nChunks, ShardSelectorFn selectShard) { +CollectionMetadata makeChunkManagerWithShardSelector(int nShards, + uint32_t nChunks, + ShardSelectorFn selectShard) { const auto collEpoch = OID::gen(); - const auto collName = NamespaceString("test.foo"); const auto shardKeyPattern = KeyPattern(BSON("_id" << 1)); std::vector<ChunkType> chunks; chunks.reserve(nChunks); for (uint32_t i = 0; i < nChunks; ++i) { - chunks.emplace_back(collName, + chunks.emplace_back(kNss, getRangeForChunk(i, nChunks), ChunkVersion{i + 1, 0, collEpoch}, selectShard(i, nShards, nChunks)); } auto routingTableHistory = RoutingTableHistory::makeNew( - collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, chunks); - return std::make_unique<CollectionMetadata>(ChunkManager(routingTableHistory, boost::none), - ShardId("shard0")); + kNss, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, chunks); + return CollectionMetadata( + ChunkManager( + ShardId("Shard0"), DatabaseVersion(UUID::gen(), 1), routingTableHistory, boost::none), + ShardId("shard0")); } ShardId pessimalShardSelector(int i, int nShards, int nChunks) { @@ -98,22 +103,22 @@ MONGO_COMPILER_NOINLINE auto makeChunkManagerWithOptimalBalancedDistribution(int MONGO_COMPILER_NOINLINE auto runIncrementalUpdate(const CollectionMetadata& cm, const std::vector<ChunkType>& newChunks) { auto rt = cm.getChunkManager()->getRoutingHistory()->makeUpdated(newChunks); - return std::make_unique<CollectionMetadata>(ChunkManager(rt, boost::none), ShardId("shard0")); + return std::make_unique<CollectionMetadata>( + ChunkManager(ShardId("shard0"), DatabaseVersion(UUID::gen(), 1), rt, boost::none), + ShardId("shard0")); } void BM_IncrementalRefreshWithNoChange(benchmark::State& state) { const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeChunkManagerWithOptimalBalancedDistribution(nShards, nChunks); + auto metadata = makeChunkManagerWithOptimalBalancedDistribution(nShards, nChunks); - auto postMoveVersion = cm->getChunkManager()->getVersion(); - const auto collName = NamespaceString(cm->getChunkManager()->getns()); + auto postMoveVersion = metadata.getChunkManager()->getVersion(); std::vector<ChunkType> newChunks; - newChunks.emplace_back( - collName, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); + newChunks.emplace_back(kNss, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); for (auto keepRunning : state) { - benchmark::DoNotOptimize(runIncrementalUpdate(*cm, newChunks)); + benchmark::DoNotOptimize(runIncrementalUpdate(metadata, newChunks)); } } @@ -125,20 +130,17 @@ BENCHMARK(BM_IncrementalRefreshWithNoChange) void BM_IncrementalRefreshOfPessimalBalancedDistribution(benchmark::State& state) { const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeChunkManagerWithPessimalBalancedDistribution(nShards, nChunks); + auto metadata = makeChunkManagerWithPessimalBalancedDistribution(nShards, nChunks); - auto postMoveVersion = cm->getChunkManager()->getVersion(); - const auto collName = NamespaceString(cm->getChunkManager()->getns()); + auto postMoveVersion = metadata.getChunkManager()->getVersion(); std::vector<ChunkType> newChunks; postMoveVersion.incMajor(); - newChunks.emplace_back( - collName, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); + newChunks.emplace_back(kNss, getRangeForChunk(1, nChunks), postMoveVersion, ShardId("shard0")); postMoveVersion.incMajor(); - newChunks.emplace_back( - collName, getRangeForChunk(3, nChunks), postMoveVersion, ShardId("shard1")); + newChunks.emplace_back(kNss, getRangeForChunk(3, nChunks), postMoveVersion, ShardId("shard1")); for (auto keepRunning : state) { - benchmark::DoNotOptimize(runIncrementalUpdate(*cm, newChunks)); + benchmark::DoNotOptimize(runIncrementalUpdate(metadata, newChunks)); } } @@ -168,8 +170,11 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS for (auto keepRunning : state) { auto routingTableHistory = RoutingTableHistory::makeNew( collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, chunks); - benchmark::DoNotOptimize( - CollectionMetadata(ChunkManager(routingTableHistory, boost::none), ShardId("shard0"))); + benchmark::DoNotOptimize(CollectionMetadata(ChunkManager(ShardId("shard0"), + DatabaseVersion(UUID::gen(), 1), + routingTableHistory, + boost::none), + ShardId("shard0"))); } } @@ -257,13 +262,13 @@ void BM_FindIntersectingChunk(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto keysIter = makeCircularIterator(keys); for (auto keepRunning : state) { benchmark::DoNotOptimize( - cm->getChunkManager()->findIntersectingChunkWithSimpleCollation(*keysIter)); + metadata.getChunkManager()->findIntersectingChunkWithSimpleCollation(*keysIter)); ++keysIter; } @@ -276,14 +281,14 @@ void BM_GetShardIdsForRange(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto ranges = makeRanges(keys); auto rangesIter = makeCircularIterator(ranges); for (auto keepRunning : state) { std::set<ShardId> shardIds; - cm->getChunkManager()->getShardIdsForRange( + metadata.getChunkManager()->getShardIdsForRange( rangesIter->first, rangesIter->second, &shardIds); ++rangesIter; } @@ -297,13 +302,13 @@ void BM_GetShardIdsForRangeMinKeyToMaxKey(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto min = BSON("_id" << MINKEY); auto max = BSON("_id" << MAXKEY); for (auto keepRunning : state) { std::set<ShardId> shardIds; - cm->getChunkManager()->getShardIdsForRange(min, max, &shardIds); + metadata.getChunkManager()->getShardIdsForRange(min, max, &shardIds); } state.SetItemsProcessed(state.iterations()); @@ -315,14 +320,14 @@ void BM_KeyBelongsToMe(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto keysIter = makeCircularIterator(keys); size_t nOwned = 0; for (auto keepRunning : state) { - if (cm->keyBelongsToMe(*keysIter)) { + if (metadata.keyBelongsToMe(*keysIter)) { ++nOwned; } ++keysIter; @@ -338,7 +343,7 @@ void BM_RangeOverlapsChunk(benchmark::State& state, const int nShards = state.range(0); const int nChunks = state.range(1); - auto cm = makeCollectionMetadata(nShards, nChunks); + auto metadata = makeCollectionMetadata(nShards, nChunks); auto keys = makeKeys(nChunks); auto ranges = makeRanges(keys); auto rangesIter = makeCircularIterator(ranges); @@ -346,7 +351,7 @@ void BM_RangeOverlapsChunk(benchmark::State& state, size_t nOverlapped = 0; for (auto keepRunning : state) { - if (cm->rangeOverlapsChunk(ChunkRange(rangesIter->first, rangesIter->second))) { + if (metadata.rangeOverlapsChunk(ChunkRange(rangesIter->first, rangesIter->second))) { ++nOverlapped; } ++rangesIter; diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index a282a1c548e..ac9c9900281 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -376,7 +376,7 @@ BSONObj stripWriteConcern(const BSONObj& cmdObj) { std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const BSONObj& query, @@ -384,24 +384,24 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard auto cmdToSend = cmdObj; - if (!routingInfo.cm()) { + if (!cm.isSharded()) { // The collection is unsharded. Target only the primary shard for the database. - const auto primaryShardId = routingInfo.db().primaryId(); + const auto primaryShardId = cm.dbPrimary(); if (shardsToSkip.find(primaryShardId) != shardsToSkip.end()) { return {}; } // Attach shardVersion "UNSHARDED", unless targeting the config server. - const auto cmdObjWithShardVersion = (primaryShardId != "config") + const auto cmdObjWithShardVersion = (primaryShardId != ShardRegistry::kConfigServerShardId) ? appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED()) : cmdToSend; return buildUnshardedRequestsForAllShards( opCtx, {primaryShardId}, - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo.db())); + appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())); } std::vector<AsyncRequestsSender::Request> requests; @@ -415,12 +415,11 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard } auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(collator), nss); - routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); for (const ShardId& shardId : shardIds) { if (shardsToSkip.find(shardId) == shardsToSkip.end()) { - requests.emplace_back( - shardId, appendShardVersion(cmdToSend, routingInfo.cm()->getVersion(shardId))); + requests.emplace_back(shardId, appendShardVersion(cmdToSend, cm.getVersion(shardId))); } } @@ -441,14 +440,14 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, const BSONObj& query, const BSONObj& collation) { const auto requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, {} /* shardsToSkip */, cmdObj, query, collation); + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, query, collation); return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests); } @@ -458,7 +457,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, @@ -466,7 +465,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( const BSONObj& query, const BSONObj& collation) { const auto requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, shardsToSkip, cmdObj, query, collation); + opCtx, nss, cm, shardsToSkip, cmdObj, query, collation); return gatherResponsesNoThrowOnStaleShardVersionErrors( opCtx, dbName, readPref, retryPolicy, requests); @@ -479,18 +478,18 @@ std::vector<AsyncRequestsSender::Response> scatterGatherOnlyVersionIfUnsharded( const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, const std::set<ErrorCodes::Error>& ignorableErrors) { - auto routingInfo = + auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); std::vector<AsyncRequestsSender::Request> requests; - if (routingInfo.cm()) { + if (cm.isSharded()) { // An unversioned request on a sharded collection can cause a shard that has not owned data // for the collection yet to implicitly create the collection without all the collection // options. So, we signal to shards that they should not implicitly create the collection. requests = buildUnversionedRequestsForAllShards(opCtx, cmdObj); } else { requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, {} /* shardsToSkip */, cmdObj, BSONObj(), BSONObj()); + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, BSONObj(), BSONObj()); } return gatherResponses(opCtx, nss.db(), readPref, retryPolicy, requests); @@ -528,27 +527,21 @@ AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy) { - const auto query = routingInfo.cm() - ? routingInfo.cm()->getShardKeyPattern().getKeyPattern().globalMin() - : BSONObj(); + const auto query = + cm.isSharded() ? cm.getShardKeyPattern().getKeyPattern().globalMin() : BSONObj(); - auto responses = - gatherResponses(opCtx, - nss.db(), - readPref, - retryPolicy, - buildVersionedRequestsForTargetedShards(opCtx, - nss, - routingInfo, - {} /* shardsToSkip */, - cmdObj, - query, - BSONObj() /* collation */)); + auto responses = gatherResponses( + opCtx, + nss.db(), + readPref, + retryPolicy, + buildVersionedRequestsForTargetedShards( + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, query, BSONObj() /* collation */)); return std::move(responses.front()); } @@ -749,31 +742,31 @@ void createShardDatabase(OperationContext* opCtx, StringData dbName) { } std::set<ShardId> getTargetedShardsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& query, const BSONObj& collation) { - if (routingInfo.cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation. + if (cm.isSharded()) { + // The collection is sharded. Use the routing table to decide which shards to target based + // on the query and collation. std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + cm.getShardIdsForQuery(expCtx, query, collation, &shardIds); return shardIds; } // The collection is unsharded. Target only the primary shard for the database. - return {routingInfo.db().primaryId()}; + return {cm.dbPrimary()}; } std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const BSONObj& query, const BSONObj& collation) { std::vector<std::pair<ShardId, BSONObj>> requests; auto ars_requests = buildVersionedRequestsForTargetedShards( - opCtx, nss, routingInfo, {} /* shardsToSkip */, cmdObj, query, collation); + opCtx, nss, cm, {} /* shardsToSkip */, cmdObj, query, collation); std::transform(std::make_move_iterator(ars_requests.begin()), std::make_move_iterator(ars_requests.end()), std::back_inserter(requests), @@ -784,8 +777,8 @@ std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( return requests; } -StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( - OperationContext* opCtx, const NamespaceString& nss) { +StatusWith<ChunkManager> getCollectionRoutingInfoForTxnCmd(OperationContext* opCtx, + const NamespaceString& nss) { auto catalogCache = Grid::get(opCtx)->catalogCache(); invariant(catalogCache); @@ -808,29 +801,29 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( StatusWith<Shard::QueryResponse> loadIndexesFromAuthoritativeShard(OperationContext* opCtx, const NamespaceString& nss) { - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); auto [indexShard, listIndexesCmd] = [&]() -> std::pair<std::shared_ptr<Shard>, BSONObj> { auto cmdNoVersion = applyReadWriteConcern( opCtx, true /* appendRC */, false /* appendWC */, BSON("listIndexes" << nss.coll())); - if (routingInfo.cm()) { + if (cm.isSharded()) { // For a sharded collection we must load indexes from a shard with chunks. For // consistency with cluster listIndexes, load from the shard that owns the minKey chunk. - const auto minKeyShardId = routingInfo.cm()->getMinKeyShardIdWithSimpleCollation(); - auto minKeyShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId)); - return {minKeyShard, - appendShardVersion(cmdNoVersion, routingInfo.cm()->getVersion(minKeyShardId))}; + const auto minKeyShardId = cm.getMinKeyShardIdWithSimpleCollation(); + return { + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId)), + appendShardVersion(cmdNoVersion, cm.getVersion(minKeyShardId))}; } else { // For an unsharded collection, the primary shard will have correct indexes. We attach // unsharded shard version to detect if the collection has become sharded. const auto cmdObjWithShardVersion = - (routingInfo.db().primaryId() != ShardRegistry::kConfigServerShardId) + (cm.dbPrimary() != ShardRegistry::kConfigServerShardId) ? appendShardVersion(cmdNoVersion, ChunkVersion::UNSHARDED()) : cmdNoVersion; - return {routingInfo.db().primary(), - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo.db())}; + return { + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())), + appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}; } }(); diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 243e0d5b7d7..11985ac9682 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -87,7 +87,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTarg std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const BSONObj& query, @@ -192,7 +192,7 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy, @@ -215,7 +215,7 @@ scatterGatherVersionedTargetByRoutingTableNoThrowOnStaleShardVersionErrors( OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardsToSkip, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, @@ -275,7 +275,7 @@ AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const ReadPreferenceSetting& readPref, Shard::RetryPolicy retryPolicy); @@ -336,7 +336,7 @@ void createShardDatabase(OperationContext* opCtx, StringData dbName); * info. */ std::set<ShardId> getTargetedShardsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& query, const BSONObj& collation); @@ -347,7 +347,7 @@ std::set<ShardId> getTargetedShardsForQuery(boost::intrusive_ptr<ExpressionConte std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, const BSONObj& query, const BSONObj& collation); @@ -360,8 +360,8 @@ std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards( * * Should be used by all router commands that can be run in a transaction when targeting shards. */ -StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( - OperationContext* opCtx, const NamespaceString& nss); +StatusWith<ChunkManager> getCollectionRoutingInfoForTxnCmd(OperationContext* opCtx, + const NamespaceString& nss); /** * Loads all of the indexes for the given namespace from the appropriate shard. For unsharded diff --git a/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp b/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp index 9fd74b44b71..0750a0b4b47 100644 --- a/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp +++ b/src/mongo/s/commands/cluster_clear_jumbo_flag_cmd.cpp @@ -73,10 +73,9 @@ public: } void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, ns())); - const auto cm = routingInfo.cm(); uassert(ErrorCodes::InvalidOptions, "bounds can only have exactly 2 elements", @@ -93,21 +92,20 @@ public: boost::optional<Chunk> chunk; if (request().getFind()) { - BSONObj shardKey = - uassertStatusOK(cm->getShardKeyPattern().extractShardKeyFromQuery( - opCtx, ns(), *request().getFind())); + BSONObj shardKey = uassertStatusOK(cm.getShardKeyPattern().extractShardKeyFromQuery( + opCtx, ns(), *request().getFind())); uassert(51260, str::stream() << "no shard key found in chunk query " << *request().getFind(), !shardKey.isEmpty()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(shardKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(shardKey)); } else { auto boundsArray = *request().getBounds(); - BSONObj minKey = cm->getShardKeyPattern().normalizeShardKey(boundsArray.front()); - BSONObj maxKey = cm->getShardKeyPattern().normalizeShardKey(boundsArray.back()); + BSONObj minKey = cm.getShardKeyPattern().normalizeShardKey(boundsArray.front()); + BSONObj maxKey = cm.getShardKeyPattern().normalizeShardKey(boundsArray.back()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(minKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(minKey)); uassert(51261, str::stream() << "no chunk found with the shard key bounds " @@ -117,7 +115,7 @@ public: } ConfigsvrClearJumboFlag configCmd( - ns(), cm->getVersion().epoch(), chunk->getMin(), chunk->getMax()); + ns(), cm.getVersion().epoch(), chunk->getMin(), chunk->getMax()); configCmd.setDbName(request().getDbName()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); diff --git a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp index c493f2a72f5..f65f88c119a 100644 --- a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp @@ -111,13 +111,13 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (routingInfo.cm()) { + if (cm.isSharded()) { result.appendBool("sharded", true); } else { result.appendBool("sharded", false); - result.append("primary", routingInfo.db().primaryId().toString()); + result.append("primary", cm.dbPrimary().toString()); } int scale = 1; @@ -138,7 +138,7 @@ public: opCtx, nss.db(), nss, - routingInfo, + cm, applyReadWriteConcern( opCtx, this, @@ -261,7 +261,7 @@ public: result.append("maxSize", maxSize / scale); result.append("nindexes", nindexes); result.append("scaleFactor", scale); - result.append("nchunks", (routingInfo.cm() ? routingInfo.cm()->numChunks() : 1)); + result.append("nchunks", cm.numChunks()); result.append("shards", shardStats.obj()); return true; diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 7359c79e910..9f836644ff5 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -176,10 +176,10 @@ public: CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); } - const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + const auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); if (repl::ReadConcernArgs::get(opCtx).getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && - !opCtx->inMultiDocumentTransaction() && routingInfo.cm()) { + !opCtx->inMultiDocumentTransaction() && cm.isSharded()) { uasserted(ErrorCodes::InvalidOptions, "readConcern level \"snapshot\" prohibited for \"distinct\" command on" " sharded collection"); @@ -191,7 +191,7 @@ public: opCtx, nss.db(), nss, - routingInfo, + cm, applyReadWriteConcern( opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), @@ -226,12 +226,11 @@ public: return true; } - BSONObjComparator bsonCmp( - BSONObj(), - BSONObjComparator::FieldNamesMode::kConsider, - !collation.isEmpty() - ? collator.get() - : (routingInfo.cm() ? routingInfo.cm()->getDefaultCollator() : nullptr)); + BSONObjComparator bsonCmp(BSONObj(), + BSONObjComparator::FieldNamesMode::kConsider, + !collation.isEmpty() + ? collator.get() + : (cm.isSharded() ? cm.getDefaultCollator() : nullptr)); BSONObjSet all = bsonCmp.makeBSONObjSet(); for (const auto& response : shardResponses) { diff --git a/src/mongo/s/commands/cluster_filemd5_cmd.cpp b/src/mongo/s/commands/cluster_filemd5_cmd.cpp index 734695db71b..d351a7be7a3 100644 --- a/src/mongo/s/commands/cluster_filemd5_cmd.cpp +++ b/src/mongo/s/commands/cluster_filemd5_cmd.cpp @@ -85,16 +85,15 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - const auto callShardFn = [opCtx, &nss, &routingInfo](const BSONObj& cmdObj, - const BSONObj& routingQuery) { + const auto callShardFn = [&](const BSONObj& cmdObj, const BSONObj& routingQuery) { auto shardResults = scatterGatherVersionedTargetByRoutingTable(opCtx, nss.db(), nss, - routingInfo, + cm, cmdObj, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, @@ -113,9 +112,9 @@ public: // If the collection is not sharded, or is sharded only on the 'files_id' field, we only // need to target a single shard, because the files' chunks can only be contained in a // single sharded chunk - if (!routingInfo.cm() || - SimpleBSONObjComparator::kInstance.evaluate( - routingInfo.cm()->getShardKeyPattern().toBSON() == BSON("files_id" << 1))) { + if (!cm.isSharded() || + SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == + BSON("files_id" << 1))) { CommandHelpers::filterCommandReplyForPassthrough( callShardFn( applyReadWriteConcern( @@ -130,9 +129,8 @@ public: uassert(ErrorCodes::IllegalOperation, "The GridFS fs.chunks collection must be sharded on either {files_id:1} or " "{files_id:1, n:1}", - SimpleBSONObjComparator::kInstance.evaluate( - routingInfo.cm()->getShardKeyPattern().toBSON() == - BSON("files_id" << 1 << "n" << 1))); + SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == + BSON("files_id" << 1 << "n" << 1))); // Theory of operation: // diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 03d32d8edda..0dba2c8ba40 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -208,27 +208,25 @@ public: const BSONObj& cmdObj = request.body; const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj)); - auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - boost::optional<ChunkManager> chunkMgr; std::shared_ptr<Shard> shard; - if (!routingInfo.cm()) { - shard = routingInfo.db().primary(); - } else { - chunkMgr.emplace(*routingInfo.cm()); - + if (cm.isSharded()) { const BSONObj query = cmdObj.getObjectField("query"); const BSONObj collation = getCollation(cmdObj); const auto let = getLet(cmdObj); const auto rc = getRuntimeConstants(cmdObj); const BSONObj shardKey = - getShardKey(opCtx, *chunkMgr, nss, query, collation, verbosity, let, rc); - const auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); + getShardKey(opCtx, cm, nss, query, collation, verbosity, let, rc); + const auto chunk = cm.findIntersectingChunk(shardKey, collation); shard = uassertStatusOK( Grid::get(opCtx)->shardRegistry()->getShard(opCtx, chunk.getShardId())); + } else { + shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())); } const auto explainCmd = ClusterExplain::wrapAsExplain( @@ -238,10 +236,10 @@ public: Timer timer; BSONObjBuilder bob; - if (chunkMgr) { + if (cm.isSharded()) { _runCommand(opCtx, shard->getId(), - chunkMgr->getVersion(shard->getId()), + cm.getVersion(shard->getId()), boost::none, nss, applyReadWriteConcern(opCtx, false, false, explainCmd), @@ -250,7 +248,7 @@ public: _runCommand(opCtx, shard->getId(), ChunkVersion::UNSHARDED(), - routingInfo.db().databaseVersion(), + cm.dbVersion(), nss, applyReadWriteConcern(opCtx, false, false, explainCmd), &bob); @@ -286,31 +284,30 @@ public: // Append mongoS' runtime constants to the command object before forwarding it to the shard. auto cmdObjForShard = appendRuntimeConstantsToCommandObject(opCtx, cmdObj); - const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - if (!routingInfo.cm()) { + const auto cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + if (!cm.isSharded()) { _runCommand(opCtx, - routingInfo.db().primaryId(), + cm.dbPrimary(), ChunkVersion::UNSHARDED(), - routingInfo.db().databaseVersion(), + cm.dbVersion(), nss, applyReadWriteConcern(opCtx, this, cmdObjForShard), &result); return true; } - const auto chunkMgr = routingInfo.cm(); - const BSONObj query = cmdObjForShard.getObjectField("query"); const BSONObj collation = getCollation(cmdObjForShard); const auto let = getLet(cmdObjForShard); const auto rc = getRuntimeConstants(cmdObjForShard); const BSONObj shardKey = - getShardKey(opCtx, *chunkMgr, nss, query, collation, boost::none, let, rc); - auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation); + getShardKey(opCtx, cm, nss, query, collation, boost::none, let, rc); + + auto chunk = cm.findIntersectingChunk(shardKey, collation); _runCommand(opCtx, chunk.getShardId(), - chunkMgr->getVersion(chunk.getShardId()), + cm.getVersion(chunk.getShardId()), boost::none, nss, applyReadWriteConcern(opCtx, this, cmdObjForShard), diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp index 2d7cf7fdda8..e83722c71e4 100644 --- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp @@ -101,13 +101,11 @@ public: result.append("version", cachedDbInfo.databaseVersion().toBSON()); } else { // Return the collection's information. - auto cachedCollInfo = - uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + const auto cm = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << nss.ns() << " is not sharded.", - cachedCollInfo.cm()); - const auto cm = cachedCollInfo.cm(); - cm->getVersion().appendLegacyWithField(&result, "version"); + cm.isSharded()); + cm.getVersion().appendLegacyWithField(&result, "version"); if (cmdObj["fullMetadata"].trueValue()) { BSONArrayBuilder chunksArrBuilder; @@ -116,9 +114,9 @@ public: LOGV2(22753, "Routing info requested by getShardVersion: {routingInfo}", "Routing info requested by getShardVersion", - "routingInfo"_attr = redact(cm->toString())); + "routingInfo"_attr = redact(cm.toString())); - cm->forEachChunk([&](const auto& chunk) { + cm.forEachChunk([&](const auto& chunk) { if (!exceedsSizeLimit) { BSONArrayBuilder chunkBB(chunksArrBuilder.subarrayStart()); chunkBB.append(chunk.getMin()); diff --git a/src/mongo/s/commands/cluster_list_indexes_cmd.cpp b/src/mongo/s/commands/cluster_list_indexes_cmd.cpp index 8a4d93a3297..79bc042a0d7 100644 --- a/src/mongo/s/commands/cluster_list_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_indexes_cmd.cpp @@ -41,14 +41,14 @@ namespace { bool cursorCommandPassthroughShardWithMinKeyChunk(OperationContext* opCtx, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, BSONObjBuilder* out, const PrivilegeVector& privileges) { auto response = executeCommandAgainstShardWithMinKeyChunk( opCtx, nss, - routingInfo, + cm, CommandHelpers::filterCommandRequestForPassthrough(cmdObj), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); @@ -121,13 +121,13 @@ public: CommandHelpers::handleMarkKillOnClientDisconnect(opCtx); const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); return cursorCommandPassthroughShardWithMinKeyChunk( opCtx, nss, - routingInfo, + cm, applyReadWriteConcern(opCtx, this, cmdObj), &result, {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)}); diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 7908cdec77b..c6a166b942c 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -58,12 +58,12 @@ namespace { auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr, - boost::optional<CachedCollectionRoutingInfo> routingInfo, + const ChunkManager& cm, boost::optional<ExplainOptions::Verbosity> verbosity) { // Populate the collection UUID and the appropriate collation to use. auto nss = parsedMr.getNamespace(); auto [collationObj, uuid] = cluster_aggregation_planner::getCollationAndUUID( - routingInfo, nss, parsedMr.getCollation().get_value_or(BSONObj())); + opCtx, cm, nss, parsedMr.getCollation().get_value_or(BSONObj())); std::unique_ptr<CollatorInterface> resolvedCollator; if (!collationObj.isEmpty()) { @@ -154,9 +154,9 @@ bool runAggregationMapReduce(OperationContext* opCtx, involvedNamespaces.insert(resolvedOutNss); } - auto routingInfo = uassertStatusOK( + auto cm = uassertStatusOK( sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, parsedMr.getNamespace())); - auto expCtx = makeExpressionContext(opCtx, parsedMr, routingInfo, verbosity); + auto expCtx = makeExpressionContext(opCtx, parsedMr, cm, verbosity); const auto pipelineBuilder = [&]() { return map_reduce_common::translateFromMR(parsedMr, expCtx); @@ -176,7 +176,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, cluster_aggregation_planner::AggregationTargeter::make(opCtx, parsedMr.getNamespace(), pipelineBuilder, - routingInfo, + cm, involvedNamespaces, false, // hasChangeStream true); // allowedToPassthrough @@ -187,14 +187,14 @@ bool runAggregationMapReduce(OperationContext* opCtx, // needed in the normal aggregation path. For this translation, though, we need to // build the pipeline to serialize and send to the primary shard. auto serialized = serializeToCommand(cmd, parsedMr, pipelineBuilder().get()); - uassertStatusOK(cluster_aggregation_planner::runPipelineOnPrimaryShard( - expCtx, - namespaces, - targeter.routingInfo->db(), - verbosity, - std::move(serialized), - privileges, - &tempResults)); + uassertStatusOK( + cluster_aggregation_planner::runPipelineOnPrimaryShard(expCtx, + namespaces, + *targeter.cm, + verbosity, + std::move(serialized), + privileges, + &tempResults)); break; } diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 26b9435f91a..b4157bee9d9 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -103,10 +103,9 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); vector<BSONObj> bounds; if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) { @@ -136,19 +135,19 @@ public: return false; } - if (!cm->getShardKeyPattern().isShardKey(minKey) || - !cm->getShardKeyPattern().isShardKey(maxKey)) { + if (!cm.getShardKeyPattern().isShardKey(minKey) || + !cm.getShardKeyPattern().isShardKey(maxKey)) { errmsg = str::stream() << "shard key bounds " << "[" << minKey << "," << maxKey << ")" - << " are not valid for shard key pattern " << cm->getShardKeyPattern().toBSON(); + << " are not valid for shard key pattern " << cm.getShardKeyPattern().toBSON(); return false; } - minKey = cm->getShardKeyPattern().normalizeShardKey(minKey); - maxKey = cm->getShardKeyPattern().normalizeShardKey(maxKey); + minKey = cm.getShardKeyPattern().normalizeShardKey(minKey); + maxKey = cm.getShardKeyPattern().normalizeShardKey(maxKey); - const auto firstChunk = cm->findIntersectingChunkWithSimpleCollation(minKey); + const auto firstChunk = cm.findIntersectingChunkWithSimpleCollation(minKey); BSONObjBuilder remoteCmdObjB; remoteCmdObjB.append(cmdObj[ClusterMergeChunksCommand::nsField()]); @@ -158,7 +157,7 @@ public: Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString().toString()); remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk.getShardId().toString()); - remoteCmdObjB.append("epoch", cm->getVersion().epoch()); + remoteCmdObjB.append("epoch", cm.getVersion().epoch()); BSONObj remoteResult; @@ -175,10 +174,10 @@ public: Shard::RetryPolicy::kNotIdempotent)); uassertStatusOK(response.commandStatus); - Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(routingInfo), - firstChunk.getShardId()); - CommandHelpers::filterCommandReplyForPassthrough(response.response, &result); + Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection( + nss, firstChunk.getShardId()); + CommandHelpers::filterCommandReplyForPassthrough(response.response, &result); return true; } diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 398d8fc49c4..01cdb91234e 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -99,10 +99,9 @@ public: const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); const auto toElt = cmdObj["to"]; uassert(ErrorCodes::TypeMismatch, @@ -149,29 +148,29 @@ public: if (!find.isEmpty()) { // find - BSONObj shardKey = uassertStatusOK( - cm->getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); + BSONObj shardKey = + uassertStatusOK(cm.getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); if (shardKey.isEmpty()) { errmsg = str::stream() << "no shard key found in chunk query " << find; return false; } - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(shardKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(shardKey)); } else { // bounds - if (!cm->getShardKeyPattern().isShardKey(bounds[0].Obj()) || - !cm->getShardKeyPattern().isShardKey(bounds[1].Obj())) { + if (!cm.getShardKeyPattern().isShardKey(bounds[0].Obj()) || + !cm.getShardKeyPattern().isShardKey(bounds[1].Obj())) { errmsg = str::stream() << "shard key bounds " << "[" << bounds[0].Obj() << "," << bounds[1].Obj() << ")" - << " are not valid for shard key pattern " << cm->getShardKeyPattern().toBSON(); + << " are not valid for shard key pattern " << cm.getShardKeyPattern().toBSON(); return false; } - BSONObj minKey = cm->getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); - BSONObj maxKey = cm->getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); + BSONObj minKey = cm.getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); + BSONObj maxKey = cm.getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(minKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(minKey)); if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) { errmsg = str::stream() << "no chunk found with the shard key bounds " @@ -188,7 +187,7 @@ public: chunkType.setMin(chunk->getMin()); chunkType.setMax(chunk->getMax()); chunkType.setShard(chunk->getShardId()); - chunkType.setVersion(cm->getVersion()); + chunkType.setVersion(cm.getVersion()); uassertStatusOK(configsvr_client::moveChunk(opCtx, chunkType, diff --git a/src/mongo/s/commands/cluster_passthrough_commands.cpp b/src/mongo/s/commands/cluster_passthrough_commands.cpp index ccb5ee5799d..1919b0e87ae 100644 --- a/src/mongo/s/commands/cluster_passthrough_commands.cpp +++ b/src/mongo/s/commands/cluster_passthrough_commands.cpp @@ -49,24 +49,17 @@ namespace { bool nonShardedCollectionCommandPassthrough(OperationContext* opCtx, StringData dbName, const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const BSONObj& cmdObj, Shard::RetryPolicy retryPolicy, BSONObjBuilder* out) { const StringData cmdName(cmdObj.firstElementFieldName()); uassert(ErrorCodes::IllegalOperation, str::stream() << "Can't do command: " << cmdName << " on a sharded collection", - !routingInfo.cm()); - - auto responses = scatterGatherVersionedTargetByRoutingTable(opCtx, - dbName, - nss, - routingInfo, - cmdObj, - ReadPreferenceSetting::get(opCtx), - retryPolicy, - {}, - {}); + !cm.isSharded()); + + auto responses = scatterGatherVersionedTargetByRoutingTable( + opCtx, dbName, nss, cm, cmdObj, ReadPreferenceSetting::get(opCtx), retryPolicy, {}, {}); invariant(responses.size() == 1); const auto cmdResponse = uassertStatusOK(std::move(responses.front().swResponse)); @@ -119,23 +112,23 @@ public: str::stream() << "Invalid target namespace: " << toNss.ns(), toNss.isValid()); - const auto fromRoutingInfo = uassertStatusOK( + const auto fromCM = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fromNss)); - uassert(13138, "You can't rename a sharded collection", !fromRoutingInfo.cm()); + uassert(13138, "You can't rename a sharded collection", !fromCM.isSharded()); - const auto toRoutingInfo = uassertStatusOK( + const auto toCM = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, toNss)); - uassert(13139, "You can't rename to a sharded collection", !toRoutingInfo.cm()); + uassert(13139, "You can't rename to a sharded collection", !toCM.isSharded()); uassert(13137, "Source and destination collections must be on same shard", - fromRoutingInfo.db().primaryId() == toRoutingInfo.db().primaryId()); + fromCM.dbPrimary() == toCM.dbPrimary()); return nonShardedCollectionCommandPassthrough( opCtx, NamespaceString::kAdminDb, fromNss, - fromRoutingInfo, + fromCM, applyReadWriteConcern( opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), Shard::RetryPolicy::kNoRetry, @@ -173,11 +166,11 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbName, cmdObj)); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::IllegalOperation, "You can't convertToCapped a sharded collection", - !routingInfo.cm()); + !cm.isSharded()); // convertToCapped creates a temp collection and renames it at the end. It will require // special handling for create collection. @@ -185,7 +178,7 @@ public: opCtx, dbName, nss, - routingInfo, + cm, applyReadWriteConcern( opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), Shard::RetryPolicy::kIdempotent, @@ -230,13 +223,11 @@ public: "Performing splitVector across dbs isn't supported via mongos", nss.db() == dbName); - const auto routingInfo = + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::IllegalOperation, str::stream() << "can't do command: " << getName() << " on sharded collection", - !routingInfo.cm()); - - const auto primaryShard = routingInfo.db().primary(); + !cm.isSharded()); // Here, we first filter the command before appending an UNSHARDED shardVersion, because // "shardVersion" is one of the fields that gets filtered out. @@ -245,11 +236,14 @@ public: BSONObj filteredCmdObjWithVersion( appendShardVersion(filteredCmdObj, ChunkVersion::UNSHARDED())); - auto commandResponse = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( + auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cm.dbPrimary())); + auto commandResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting::get(opCtx), dbName, - primaryShard->isConfig() ? filteredCmdObj : filteredCmdObjWithVersion, + cm.dbPrimary() == ShardRegistry::kConfigServerShardId ? filteredCmdObj + : filteredCmdObjWithVersion, Shard::RetryPolicy::kIdempotent)); uassert(ErrorCodes::IllegalOperation, @@ -260,7 +254,7 @@ public: if (!commandResponse.writeConcernStatus.isOK()) { appendWriteConcernErrorToCmdResponse( - primaryShard->getId(), commandResponse.response["writeConcernError"], result); + cm.dbPrimary(), commandResponse.response["writeConcernError"], result); } result.appendElementsUnique( CommandHelpers::filterCommandReplyForPassthrough(std::move(commandResponse.response))); diff --git a/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp b/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp index 3f115594172..0e1768c7ed4 100644 --- a/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp +++ b/src/mongo/s/commands/cluster_refine_collection_shard_key_cmd.cpp @@ -54,7 +54,7 @@ public: void typedRun(OperationContext* opCtx) { const NamespaceString& nss = ns(); - const auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); @@ -64,7 +64,7 @@ public: } ConfigsvrRefineCollectionShardKey configsvrRefineCollShardKey( - nss, request().getKey(), routingInfo.cm()->getVersion().epoch()); + nss, request().getKey(), cm.getVersion().epoch()); configsvrRefineCollShardKey.setDbName(request().getDbName()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp index f5ea1d18fee..19d33b3f10b 100644 --- a/src/mongo/s/commands/cluster_split_cmd.cpp +++ b/src/mongo/s/commands/cluster_split_cmd.cpp @@ -130,10 +130,9 @@ public: BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); - auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); - const auto cm = routingInfo.cm(); const BSONField<BSONObj> findField("find", BSONObj()); const BSONField<BSONArray> boundsField("bounds", BSONArray()); @@ -193,29 +192,29 @@ public: if (!find.isEmpty()) { // find - BSONObj shardKey = uassertStatusOK( - cm->getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); + BSONObj shardKey = + uassertStatusOK(cm.getShardKeyPattern().extractShardKeyFromQuery(opCtx, nss, find)); if (shardKey.isEmpty()) { errmsg = str::stream() << "no shard key found in chunk query " << find; return false; } - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(shardKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(shardKey)); } else if (!bounds.isEmpty()) { // bounds - if (!cm->getShardKeyPattern().isShardKey(bounds[0].Obj()) || - !cm->getShardKeyPattern().isShardKey(bounds[1].Obj())) { + if (!cm.getShardKeyPattern().isShardKey(bounds[0].Obj()) || + !cm.getShardKeyPattern().isShardKey(bounds[1].Obj())) { errmsg = str::stream() << "shard key bounds " << "[" << bounds[0].Obj() << "," << bounds[1].Obj() << ")" - << " are not valid for shard key pattern " << cm->getShardKeyPattern().toBSON(); + << " are not valid for shard key pattern " << cm.getShardKeyPattern().toBSON(); return false; } - BSONObj minKey = cm->getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); - BSONObj maxKey = cm->getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); + BSONObj minKey = cm.getShardKeyPattern().normalizeShardKey(bounds[0].Obj()); + BSONObj maxKey = cm.getShardKeyPattern().normalizeShardKey(bounds[1].Obj()); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(minKey)); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(minKey)); if (chunk->getMin().woCompare(minKey) != 0 || chunk->getMax().woCompare(maxKey) != 0) { errmsg = str::stream() << "no chunk found with the shard key bounds " @@ -224,15 +223,15 @@ public: } } else { // middle - if (!cm->getShardKeyPattern().isShardKey(middle)) { + if (!cm.getShardKeyPattern().isShardKey(middle)) { errmsg = str::stream() << "new split key " << middle << " is not valid for shard key pattern " - << cm->getShardKeyPattern().toBSON(); + << cm.getShardKeyPattern().toBSON(); return false; } - middle = cm->getShardKeyPattern().normalizeShardKey(middle); - chunk.emplace(cm->findIntersectingChunkWithSimpleCollation(middle)); + middle = cm.getShardKeyPattern().normalizeShardKey(middle); + chunk.emplace(cm.findIntersectingChunkWithSimpleCollation(middle)); if (chunk->getMin().woCompare(middle) == 0 || chunk->getMax().woCompare(middle) == 0) { errmsg = str::stream() @@ -251,7 +250,7 @@ public: : selectMedianKey(opCtx, chunk->getShardId(), nss, - cm->getShardKeyPattern(), + cm.getShardKeyPattern(), ChunkRange(chunk->getMin(), chunk->getMax())); LOGV2(22758, @@ -266,15 +265,13 @@ public: shardutil::splitChunkAtMultiplePoints(opCtx, chunk->getShardId(), nss, - cm->getShardKeyPattern(), - cm->getVersion(), + cm.getShardKeyPattern(), + cm.getVersion(), ChunkRange(chunk->getMin(), chunk->getMax()), {splitPoint})); - // This invalidation is only necessary so that auto-split can begin to track statistics for - // the chunks produced after the split instead of the single original chunk. - Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(routingInfo), - chunk->getShardId()); + Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, + chunk->getShardId()); return true; } diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 9254504dfd9..72e38cd19fc 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -39,7 +39,6 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/query/cluster_cursor_manager.h" diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 56103395066..d85252e309c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -142,9 +142,9 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx, void updateHostsTargetedMetrics(OperationContext* opCtx, const NamespaceString& executionNss, - boost::optional<CachedCollectionRoutingInfo> executionNsRoutingInfo, + const boost::optional<ChunkManager>& cm, stdx::unordered_set<NamespaceString> involvedNamespaces) { - if (!executionNsRoutingInfo) + if (!cm) return; // Create a set of ShardIds that own a chunk belonging to any of the collections involved in @@ -153,9 +153,9 @@ void updateHostsTargetedMetrics(OperationContext* opCtx, std::set<ShardId> shardsOwningChunks = [&]() { std::set<ShardId> shardsIds; - if (executionNsRoutingInfo->cm()) { + if (cm->isSharded()) { std::set<ShardId> shardIdsForNs; - executionNsRoutingInfo->cm()->getAllShardIds(&shardIdsForNs); + cm->getAllShardIds(&shardIdsForNs); for (const auto& shardId : shardIdsForNs) { shardsIds.insert(shardId); } @@ -165,11 +165,11 @@ void updateHostsTargetedMetrics(OperationContext* opCtx, if (nss == executionNss) continue; - const auto resolvedNsRoutingInfo = + const auto resolvedNsCM = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - if (resolvedNsRoutingInfo.cm()) { + if (resolvedNsCM.isSharded()) { std::set<ShardId> shardIdsForNs; - resolvedNsRoutingInfo.cm()->getAllShardIds(&shardIdsForNs); + resolvedNsCM.getAllShardIds(&shardIdsForNs); for (const auto& shardId : shardIdsForNs) { shardsIds.insert(shardId); } @@ -219,9 +219,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, !request.getCollectionUUID()); const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) { - const auto resolvedNsRoutingInfo = - uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - return bool(resolvedNsRoutingInfo.cm()); + const auto resolvedNsCM = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + return resolvedNsCM.isSharded(); }; liteParsedPipeline.verifyIsSupported( @@ -235,11 +234,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // $changeStream, we allow the operation to continue so that stream cursors can be established // on the given namespace before the database or collection is actually created. If the database // does not exist and this is not a $changeStream, then we return an empty cursor. - boost::optional<CachedCollectionRoutingInfo> routingInfo; + boost::optional<ChunkManager> cm; auto executionNsRoutingInfoStatus = sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); if (executionNsRoutingInfoStatus.isOK()) { - routingInfo = std::move(executionNsRoutingInfoStatus.getValue()); + cm = std::move(executionNsRoutingInfoStatus.getValue()); } else if (!(hasChangeStream && executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { appendEmptyResultSetWithStatus( @@ -261,7 +260,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } return cluster_aggregation_planner::getCollationAndUUID( - routingInfo, namespaces.executionNss, request.getCollation()); + opCtx, cm, namespaces.executionNss, request.getCollation()); }(); // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, @@ -280,7 +279,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, opCtx, namespaces.executionNss, pipelineBuilder, - routingInfo, + cm, involvedNamespaces, hasChangeStream, liteParsedPipeline.allowedToPassthroughFromMongos()); @@ -306,7 +305,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return cluster_aggregation_planner::runPipelineOnPrimaryShard( expCtx, namespaces, - targeter.routingInfo->db(), + *targeter.cm, request.getExplain(), request.serializeToCommandObj(), privileges, @@ -352,7 +351,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, }(); if (status.isOK()) { - updateHostsTargetedMetrics(opCtx, namespaces.executionNss, routingInfo, involvedNamespaces); + updateHostsTargetedMetrics(opCtx, namespaces.executionNss, cm, involvedNamespaces); // Report usage statistics for each stage in the pipeline. liteParsedPipeline.tickGlobalStageCounters(); } diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 4fb474d2f8f..2c74736235a 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -29,18 +29,12 @@ #pragma once -#include <memory> -#include <vector> - #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" -#include "mongo/s/async_requests_sender.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/commands/strategy.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/document_source_merge_cursors.h" diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 3f589ba2df5..25b7a4e71f2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -162,7 +162,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const ClusterAggregate::Namespaces& namespaces, Document serializedCommand, long long batchSize, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const boost::optional<ChunkManager>& cm, DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result, const PrivilegeVector& privileges, @@ -200,12 +200,10 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we // therefore must have a valid routing table. - invariant(routingInfo); + invariant(cm); - const ShardId mergingShardId = pickMergingShard(opCtx, - shardDispatchResults.needsPrimaryShardMerge, - targetedShards, - routingInfo->db().primaryId()); + const ShardId mergingShardId = pickMergingShard( + opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cm->dbPrimary()); const bool mergingShardContributesData = std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != targetedShards.end(); @@ -481,8 +479,11 @@ ClusterClientCursorGuard convertPipelineToRouterStages( /** * Returns the output of the listCollections command filtered to the namespace 'nss'. */ -BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) { - ScopedDbConnection conn(primaryShard->getConnString()); +BSONObj getUnshardedCollInfo(OperationContext* opCtx, + const ShardId& shardId, + const NamespaceString& nss) { + auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + ScopedDbConnection conn(shard->getConnString()); std::list<BSONObj> all = conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); if (all.empty()) { @@ -492,7 +493,6 @@ BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& n return all.front(); } - /** * Returns the collection default collation or the simple collator if there is no default. If the * collection does not exist, then returns an empty BSON Object. @@ -551,7 +551,7 @@ AggregationTargeter AggregationTargeter::make( OperationContext* opCtx, const NamespaceString& executionNss, const std::function<std::unique_ptr<Pipeline, PipelineDeleter>()> buildPipelineFn, - boost::optional<CachedCollectionRoutingInfo> routingInfo, + boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, bool allowedToPassthrough) { @@ -559,9 +559,9 @@ AggregationTargeter AggregationTargeter::make( // Check if any of the involved collections are sharded. bool involvesShardedCollections = [&]() { for (const auto& nss : involvedNamespaces) { - const auto resolvedNsRoutingInfo = + const auto resolvedNsCM = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - if (resolvedNsRoutingInfo.cm()) { + if (resolvedNsCM.isSharded()) { return true; } } @@ -573,7 +573,7 @@ AggregationTargeter AggregationTargeter::make( sharded_agg_helpers::mustRunOnAllShards(executionNss, hasChangeStream); // If we don't have a routing table, then this is a $changeStream which must run on all shards. - invariant(routingInfo || (mustRunOnAll && hasChangeStream)); + invariant(cm || (mustRunOnAll && hasChangeStream)); // A pipeline is allowed to passthrough to the primary shard iff the following conditions are // met: @@ -585,20 +585,20 @@ AggregationTargeter AggregationTargeter::make( // $currentOp. // 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions // needs to include information about users that can only be deduced on mongos. - if (routingInfo && !routingInfo->cm() && !mustRunOnAll && allowedToPassthrough && + if (cm && !cm->isSharded() && !mustRunOnAll && allowedToPassthrough && !involvesShardedCollections) { - return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, routingInfo}; + return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cm}; } else { auto pipeline = buildPipelineFn(); auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired : TargetingPolicy::kAnyShard; - return AggregationTargeter{policy, std::move(pipeline), routingInfo}; + return AggregationTargeter{policy, std::move(pipeline), cm}; } } Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, - const CachedDatabaseInfo& dbInfo, + const ChunkManager& cm, boost::optional<ExplainOptions::Verbosity> explain, Document serializedCommand, const PrivilegeVector& privileges, @@ -615,7 +615,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& sharded_agg_helpers::createPassthroughCommandForShard( expCtx, serializedCommand, explain, boost::none, nullptr, BSONObj()))); - const auto shardId = dbInfo.primary()->getId(); + const auto shardId = cm.dbPrimary(); const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId) ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) : std::move(cmdObj); @@ -624,7 +624,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), namespaces.executionNss.db().toString(), - {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}}, + {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}}, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); auto response = ars.next(); @@ -746,7 +746,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, namespaces, serializedCommand, batchSize, - targeter.routingInfo, + targeter.cm, std::move(shardDispatchResults), result, privileges, @@ -754,11 +754,12 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, } std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + OperationContext* opCtx, + const boost::optional<ChunkManager>& cm, const NamespaceString& nss, const BSONObj& collation) { - const bool collectionIsSharded = (routingInfo && routingInfo->cm()); - const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); + const bool collectionIsSharded = (cm && cm->isSharded()); + const bool collectionIsNotSharded = (cm && !cm->isSharded()); // If this is a collectionless aggregation, we immediately return the user- // defined collation if one exists, or an empty BSONObj otherwise. Collectionless aggregations @@ -770,14 +771,13 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( } // If the collection is unsharded, obtain collInfo from the primary shard. - const auto unshardedCollInfo = collectionIsNotSharded - ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss) - : BSONObj(); + const auto unshardedCollInfo = + collectionIsNotSharded ? getUnshardedCollInfo(opCtx, cm->dbPrimary(), nss) : BSONObj(); // Return the collection UUID if available, or boost::none otherwise. const auto getUUID = [&]() -> auto { if (collectionIsSharded) { - return routingInfo->cm()->getUUID(); + return cm->getUUID(); } else { return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"] ? boost::optional<UUID>{uassertStatusOK( @@ -796,9 +796,8 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( if (collectionIsNotSharded) { return getDefaultCollationForUnshardedCollection(unshardedCollInfo); } else { - return routingInfo->cm()->getDefaultCollator() - ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON() - : CollationSpec::kSimpleSpec; + return cm->getDefaultCollator() ? cm->getDefaultCollator()->getSpec().toBSON() + : CollationSpec::kSimpleSpec; } }; diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index e5b9d968223..5beaed7b8f2 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -62,7 +62,8 @@ ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, * collectionless namespace. */ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + OperationContext* opCtx, + const boost::optional<ChunkManager>& cm, const NamespaceString& nss, const BSONObj& collation); @@ -78,7 +79,7 @@ struct AggregationTargeter { OperationContext* opCtx, const NamespaceString& executionNss, const std::function<std::unique_ptr<Pipeline, PipelineDeleter>()> buildPipelineFn, - boost::optional<CachedCollectionRoutingInfo> routingInfo, + boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, bool allowedToPassthrough); @@ -90,12 +91,12 @@ struct AggregationTargeter { } policy; std::unique_ptr<Pipeline, PipelineDeleter> pipeline; - boost::optional<CachedCollectionRoutingInfo> routingInfo; + boost::optional<ChunkManager> cm; }; Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, - const CachedDatabaseInfo& dbInfo, + const ChunkManager& cm, boost::optional<ExplainOptions::Verbosity> explain, Document serializedCommand, const PrivilegeVector& privileges, diff --git a/src/mongo/s/query/cluster_exchange_test.cpp b/src/mongo/s/query/cluster_exchange_test.cpp index 8324a56e7e0..bb5ef977d46 100644 --- a/src/mongo/s/query/cluster_exchange_test.cpp +++ b/src/mongo/s/query/cluster_exchange_test.cpp @@ -44,8 +44,8 @@ #include "mongo/util/scopeguard.h" namespace mongo { - namespace { + using MergeStrategyDescriptor = DocumentSourceMerge::MergeStrategyDescriptor; using WhenMatched = MergeStrategyDescriptor::WhenMatched; using WhenNotMatched = MergeStrategyDescriptor::WhenNotMatched; @@ -608,5 +608,6 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) { future.default_timed_get(); } + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8dabfd0bf63..3996e01c326 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -172,7 +172,7 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards( */ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( OperationContext* opCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, const std::set<ShardId>& shardIds, const CanonicalQuery& query, bool appendGeoNearDistanceProjection) { @@ -202,12 +202,11 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( BSONObjBuilder cmdBuilder; qrToForward->asFindCommand(&cmdBuilder); - if (routingInfo.cm()) { - routingInfo.cm()->getVersion(shardId).appendToCommand(&cmdBuilder); + if (cm.isSharded()) { + cm.getVersion(shardId).appendToCommand(&cmdBuilder); } else if (!query.nss().isOnInternalDb()) { ChunkVersion::UNSHARDED().appendToCommand(&cmdBuilder); - auto dbVersion = routingInfo.db().databaseVersion(); - cmdBuilder.append("databaseVersion", dbVersion.toBSON()); + cmdBuilder.append("databaseVersion", cm.dbVersion().toBSON()); } if (opCtx->getTxnNumber()) { @@ -221,11 +220,11 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( } void updateNumHostsTargetedMetrics(OperationContext* opCtx, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, int nTargetedShards) { int nShardsOwningChunks = 0; - if (routingInfo.cm()) { - nShardsOwningChunks = routingInfo.cm()->getNShardsOwningChunks(); + if (cm.isSharded()) { + nShardsOwningChunks = cm.getNShardsOwningChunks(); } auto targetType = NumHostsTargetedMetrics::get(opCtx).parseTargetType( @@ -237,12 +236,12 @@ void updateNumHostsTargetedMetrics(OperationContext* opCtx, CursorId runQueryWithoutRetrying(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, - const CachedCollectionRoutingInfo& routingInfo, + const ChunkManager& cm, std::vector<BSONObj>* results, bool* partialResultsReturned) { // Get the set of shards on which we will run the query. auto shardIds = getTargetedShardsForQuery(query.getExpCtx(), - routingInfo, + cm, query.getQueryRequest().getFilter(), query.getQueryRequest().getCollation()); @@ -306,8 +305,8 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Construct the requests that we will use to establish cursors on the targeted shards, // attaching the shardVersion and txnNumber, if necessary. - auto requests = constructRequestsForShards( - opCtx, routingInfo, shardIds, query, appendGeoNearDistanceProjection); + auto requests = + constructRequestsForShards(opCtx, cm, shardIds, query, appendGeoNearDistanceProjection); // Establish the cursors with a consistent shardVersion across shards. params.remotes = establishCursors(opCtx, @@ -398,7 +397,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorExhausted = true; if (shardIds.size() > 0) { - updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + updateNumHostsTargetedMetrics(opCtx, cm, shardIds.size()); } return CursorId(0); } @@ -419,7 +418,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorid = cursorId; if (shardIds.size() > 0) { - updateNumHostsTargetedMetrics(opCtx, routingInfo, shardIds.size()); + updateNumHostsTargetedMetrics(opCtx, cm, shardIds.size()); } return cursorId; diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index 729a42edd19..1fedd6aea16 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -82,4 +82,5 @@ void killRemoteCursor(OperationContext* opCtx, executor::TaskExecutor* executor, RemoteCursor&& cursor, const NamespaceString& nss); + } // namespace mongo diff --git a/src/mongo/s/query/sharded_agg_test_fixture.h b/src/mongo/s/query/sharded_agg_test_fixture.h index f72d6c2f6ec..0bfedd9f5db 100644 --- a/src/mongo/s/query/sharded_agg_test_fixture.h +++ b/src/mongo/s/query/sharded_agg_test_fixture.h @@ -109,8 +109,8 @@ public: return response; }()); - auto routingInfo = future.default_timed_get(); - ASSERT(routingInfo->cm()); + const auto cm = future.default_timed_get(); + ASSERT(cm->isSharded()); } protected: diff --git a/src/mongo/s/sessions_collection_sharded.cpp b/src/mongo/s/sessions_collection_sharded.cpp index d41f71bd617..060c1158dbd 100644 --- a/src/mongo/s/sessions_collection_sharded.cpp +++ b/src/mongo/s/sessions_collection_sharded.cpp @@ -62,19 +62,17 @@ BSONObj lsidQuery(const LogicalSessionId& lsid) { std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { - auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, NamespaceString::kLogicalSessionsNamespace)); - auto cm = routingInfo.cm(); - uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace << " is not sharded", - cm); + cm.isSharded()); std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard; for (const auto& session : sessions) { sessionIdsByOwningShard.emplace( - cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + cm.findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), session); } @@ -89,19 +87,17 @@ std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwnin std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard( OperationContext* opCtx, const LogicalSessionRecordSet& sessions) { - auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, NamespaceString::kLogicalSessionsNamespace)); - auto cm = routingInfo.cm(); - uassert(ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace << " is not sharded", - cm); + cm.isSharded()); std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard; for (const auto& session : sessions) { sessionsByOwningShard.emplace( - cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + cm.findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), session); } @@ -124,12 +120,11 @@ void SessionsCollectionSharded::checkSessionsCollectionExists(OperationContext* Grid::get(opCtx)->isShardingInitialized()); // If the collection doesn't exist, fail. Only the config servers generate it. - const auto routingInfo = uassertStatusOK( + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( opCtx, NamespaceString::kLogicalSessionsNamespace)); - uassert( - ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", routingInfo.cm()); + uassert(ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", cm.isSharded()); } void SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index 33ceee5e0d5..05db6bd6ce9 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -82,7 +82,6 @@ Status initializeGlobalShardingState(OperationContext* opCtx, rpc::ShardingEgressMetadataHookBuilder hookBuilder, boost::optional<size_t> taskExecutorPoolSize); - /** * Loads cluster ID and waits for the reload of the Shard Registry. */ diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index b9bd33d9782..4a492481be4 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -39,7 +39,6 @@ #include "mongo/db/ops/write_ops_parsers.h" #include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/cluster_commands_helpers.h" -#include "mongo/s/grid.h" #include "mongo/s/transaction_router.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index ec79a4ec1e4..f7189efdfe9 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -171,7 +171,7 @@ BSONObj getUpdateExprForTargeting(const boost::intrusive_ptr<ExpressionContext> * { _id : { $lt : 30 } } => false * { foo : <anything> } => false */ -bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager* cm) { +bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const ChunkManager& cm) { auto shardKey = kVirtualIdShardKey.extractShardKeyFromQuery(query); BSONElement idElt = shardKey["_id"]; @@ -179,9 +179,9 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, const return false; } - if (CollationIndexKey::isCollatableType(idElt.type()) && cm && + if (CollationIndexKey::isCollatableType(idElt.type()) && cm.isSharded() && !query.getQueryRequest().getCollation().isEmpty() && - !CollatorInterface::collatorsMatch(query.getCollator(), cm->getDefaultCollator())) { + !CollatorInterface::collatorsMatch(query.getCollator(), cm.getDefaultCollator())) { // The collation applies to the _id field, but the user specified a collation which doesn't // match the collection default. @@ -195,7 +195,7 @@ bool isExactIdQuery(OperationContext* opCtx, const NamespaceString& nss, const BSONObj query, const BSONObj collation, - const ChunkManager* cm) { + const ChunkManager& cm) { auto qr = std::make_unique<QueryRequest>(nss); qr->setFilter(query); if (!collation.isEmpty()) { @@ -242,10 +242,9 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA, return CompareResult_GTE; } -ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, - const ShardId& shardId) { - if (routingInfo.cm()) { - return routingInfo.cm()->getVersion(shardId); +ChunkVersion getShardVersion(const ChunkManager& cm, const ShardId& shardId) { + if (cm.isSharded()) { + return cm.getVersion(shardId); } return ChunkVersion::UNSHARDED(); @@ -262,7 +261,7 @@ ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo, * Note that the signature here is weird since our cached map of chunk versions is stored in a * ChunkManager or is implicit in the primary shard of the collection. */ -CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo, +CompareResult compareAllShardVersions(const ChunkManager& cm, const StaleShardVersionMap& remoteShardVersions) { CompareResult finalResult = CompareResult_GTE; @@ -274,7 +273,7 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing try { // Throws b/c shard constructor throws - cachedShardVersion = getShardVersion(routingInfo, shardId); + cachedShardVersion = getShardVersion(cm, shardId); } catch (const DBException& ex) { LOGV2_WARNING(22915, "could not lookup shard {shardId} in local cache, shard metadata may " @@ -301,9 +300,8 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing return finalResult; } -CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, - const DatabaseVersion& remoteDbVersion) { - DatabaseVersion cachedDbVersion = routingInfo.db().databaseVersion(); +CompareResult compareDbVersions(const ChunkManager& cm, const DatabaseVersion& remoteDbVersion) { + DatabaseVersion cachedDbVersion = cm.dbVersion(); // Db may have been dropped if (cachedDbVersion.getUuid() != remoteDbVersion.getUuid()) { @@ -321,15 +319,16 @@ CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ -bool isMetadataDifferent(const ChunkManager* managerA, +bool isMetadataDifferent(const ChunkManager& managerA, const DatabaseVersion dbVersionA, - const ChunkManager* managerB, + const ChunkManager& managerB, const DatabaseVersion dbVersionB) { - if ((managerA && !managerB) || (!managerA && managerB)) + if ((managerA.isSharded() && !managerB.isSharded()) || + (!managerA.isSharded() && managerB.isSharded())) return true; - if (managerA) { - return managerA->getVersion() != managerB->getVersion(); + if (managerA.isSharded()) { + return managerA.getVersion() != managerB.getVersion(); } return !databaseVersion::equal(dbVersionA, dbVersionB); @@ -346,13 +345,13 @@ ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, void ChunkManagerTargeter::_init(OperationContext* opCtx) { createShardDatabase(opCtx, _nss.db()); - _routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); + _cm = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); if (_targetEpoch) { - uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm()); + uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _cm->isSharded()); uassert(ErrorCodes::StaleEpoch, "Collection epoch has changed", - _routingInfo->cm()->getVersion().epoch() == *_targetEpoch); + _cm->getVersion().epoch() == *_targetEpoch); } } @@ -364,8 +363,8 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, const BSONObj& doc) const { BSONObj shardKey; - if (_routingInfo->cm()) { - shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); + if (_cm->isSharded()) { + shardKey = _cm->getShardKeyPattern().extractShardKeyFromDoc(doc); // The shard key would only be empty after extraction if we encountered an error case, such // as the shard key possessing an array value or array descendants. If the shard key // presented to the targeter was empty, we would emplace the missing fields, and the @@ -381,9 +380,7 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize())); } - return ShardEndpoint(_routingInfo->db().primary()->getId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()); + return ShardEndpoint(_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()); } std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx, @@ -405,13 +402,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. - if (!_routingInfo->cm()) { - return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()}}; + if (!_cm->isSharded()) { + return std::vector<ShardEndpoint>{ + {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}}; } - const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern(); + const auto& shardKeyPattern = _cm->getShardKeyPattern(); const auto collation = write_ops::collationOf(updateOp); auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx, @@ -468,7 +464,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* "a single shard (and have the simple collation), but this update targeted " << endPoints.size() << " shards. Update request: " << updateOp.toBSON() << ", shard key pattern: " << shardKeyPattern.toString(), - updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, _routingInfo->cm())); + updateOp.getMulti() || isExactIdQuery(opCtx, _nss, query, collation, *_cm)); // If the request is {multi:false}, then this is a single op-style update which we are // broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics. @@ -492,13 +488,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* itemRef.getRuntimeConstants()); BSONObj shardKey; - if (_routingInfo->cm()) { + if (_cm->isSharded()) { // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - shardKey = - uassertStatusOK(_routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery( - expCtx, deleteOp.getQ())); + shardKey = uassertStatusOK( + _cm->getShardKeyPattern().extractShardKeyFromQuery(expCtx, deleteOp.getQ())); } // Target the shard key or delete query @@ -530,10 +525,9 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* str::stream() << "A single delete on a sharded collection must contain an exact match " "on _id (and have the collection default collation) or contain the " "shard key (and have the simple collation). Delete request: " - << deleteOp.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString(), - !_routingInfo->cm() || deleteOp.getMulti() || - isExactIdQuery(opCtx, *cq, _routingInfo->cm())); + << deleteOp.toBSON() + << ", shard key pattern: " << _cm->getShardKeyPattern().toString(), + !_cm->isSharded() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, *_cm)); return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation)); } @@ -542,22 +536,21 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( boost::intrusive_ptr<ExpressionContext> expCtx, const BSONObj& query, const BSONObj& collation) const { - if (!_routingInfo->cm()) { - return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()}}; + if (!_cm->isSharded()) { + return std::vector<ShardEndpoint>{ + {_cm->dbPrimary(), ChunkVersion::UNSHARDED(), _cm->dbVersion()}}; } std::set<ShardId> shardIds; try { - _routingInfo->cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); + _cm->getShardIdsForQuery(expCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); + endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId)); } return endpoints; @@ -567,8 +560,8 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s const BSONObj& collation, long long estDataSize) const { try { - auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation); - return {{chunk.getShardId(), _routingInfo->cm()->getVersion(chunk.getShardId())}}; + auto chunk = _cm->findIntersectingChunk(shardKey, collation); + return {{chunk.getShardId(), _cm->getVersion(chunk.getShardId())}}; } catch (const DBException& ex) { return ex.toStatus(); } @@ -578,14 +571,14 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const { // This function is only called if doing a multi write that targets more than one shard. This // implies the collection is sharded, so we should always have a chunk manager. - invariant(_routingInfo->cm()); + invariant(_cm->isSharded()); std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); + endpoints.emplace_back(std::move(shardId), _cm->getVersion(shardId)); } return endpoints; @@ -605,7 +598,7 @@ void ChunkManagerTargeter::noteStaleShardResponse(const ShardEndpoint& endpoint, ChunkVersion remoteShardVersion; if (!staleInfo.getVersionWanted()) { // If we don't have a vWanted sent, assume the version is higher than our current version. - remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName); + remoteShardVersion = getShardVersion(*_cm, endpoint.shardName); remoteShardVersion.incMajor(); } else { remoteShardVersion = *staleInfo.getVersionWanted(); @@ -637,7 +630,7 @@ void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint, DatabaseVersion remoteDbVersion; if (!staleInfo.getVersionWanted()) { // If the vWanted is not set, assume the wanted version is higher than our current version. - remoteDbVersion = _routingInfo->db().databaseVersion(); + remoteDbVersion = _cm->dbVersion(); remoteDbVersion = databaseVersion::makeIncremented(remoteDbVersion); } else { remoteDbVersion = *staleInfo.getVersionWanted(); @@ -690,8 +683,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha // Get the latest metadata information from the cache if there were issues // - auto lastManager = _routingInfo->cm(); - auto lastDbVersion = _routingInfo->db().databaseVersion(); + auto lastManager = *_cm; + auto lastDbVersion = _cm->dbVersion(); _init(opCtx); @@ -710,8 +703,8 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha // If we couldn't target, we might need to refresh if we haven't remotely refreshed // the metadata since we last got it from the cache. - bool alreadyRefreshed = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + bool alreadyRefreshed = + isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { @@ -720,13 +713,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions); + CompareResult result = compareAllShardVersions(*_cm, _remoteShardVersions); LOGV2_DEBUG(22913, 4, @@ -743,13 +735,12 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } else if (_remoteDbVersion) { // If we got stale database versions from the remote shard, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues - CompareResult result = compareDbVersions(*_routingInfo, *_remoteDbVersion); + CompareResult result = compareDbVersions(*_cm, *_remoteDbVersion); LOGV2_DEBUG(22914, 4, @@ -767,18 +758,17 @@ void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasCha return; } - *wasChanged = isMetadataDifferent( - lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + *wasChanged = isMetadataDifferent(lastManager, lastDbVersion, *_cm, _cm->dbVersion()); } } bool ChunkManagerTargeter::endpointIsConfigServer() const { - if (!_routingInfo->cm()) { - return _routingInfo->db().primaryId() == ShardRegistry::kConfigServerShardId; + if (!_cm->isSharded()) { + return _cm->dbPrimary() == ShardRegistry::kConfigServerShardId; } std::set<ShardId> shardIds; - _routingInfo->cm()->getAllShardIds(&shardIds); + _cm->getAllShardIds(&shardIds); if (std::any_of(shardIds.begin(), shardIds.end(), [](const auto& shardId) { return shardId == ShardRegistry::kConfigServerShardId; @@ -792,8 +782,8 @@ bool ChunkManagerTargeter::endpointIsConfigServer() const { } int ChunkManagerTargeter::getNShardsOwningChunks() const { - if (_routingInfo->cm()) { - return _routingInfo->cm()->getNShardsOwningChunks(); + if (_cm->isSharded()) { + return _cm->getNShardsOwningChunks(); } return 0; diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 1e62bc2eeb5..caaa8884399 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -35,7 +35,7 @@ #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -151,7 +151,7 @@ private: bool _needsTargetingRefresh; // The latest loaded routing cache entry - boost::optional<CachedCollectionRoutingInfo> _routingInfo; + boost::optional<ChunkManager> _cm; // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and // find a new epoch, we immediately throw a StaleEpoch exception. diff --git a/src/mongo/util/lru_cache.h b/src/mongo/util/lru_cache.h index adb35affa7c..6cd4e8b691b 100644 --- a/src/mongo/util/lru_cache.h +++ b/src/mongo/util/lru_cache.h @@ -58,12 +58,12 @@ class LRUCache { LRUCache(const LRUCache&) = delete; LRUCache& operator=(const LRUCache&) = delete; -public: - explicit LRUCache(std::size_t maxSize) : _maxSize(maxSize) {} - LRUCache(LRUCache&&) = delete; LRUCache& operator=(LRUCache&&) = delete; +public: + explicit LRUCache(std::size_t maxSize) : _maxSize(maxSize) {} + using ListEntry = std::pair<K, V>; using List = std::list<ListEntry>; @@ -88,27 +88,27 @@ public: */ boost::optional<std::pair<K, V>> add(const K& key, V entry) { // If the key already exists, delete it first. - auto i = this->_map.find(key); - if (i != this->_map.end()) { - this->_list.erase(i->second); + auto i = _map.find(key); + if (i != _map.end()) { + _list.erase(i->second); } - this->_list.push_front(std::make_pair(key, std::move(entry))); - this->_map[key] = this->_list.begin(); + _list.push_front(std::make_pair(key, std::move(entry))); + _map[key] = _list.begin(); // If the store has grown beyond its allowed size, // evict the least recently used entry. - if (this->size() > this->_maxSize) { - auto pair = std::move(this->_list.back()); + if (size() > _maxSize) { + auto pair = std::move(_list.back()); - this->_map.erase(pair.first); - this->_list.pop_back(); + _map.erase(pair.first); + _list.pop_back(); - invariant(this->size() <= this->_maxSize); + invariant(size() <= _maxSize); return std::move(pair); } - invariant(this->size() <= this->_maxSize); + invariant(size() <= _maxSize); return boost::none; } @@ -116,7 +116,7 @@ public: * Finds an element in the cache by key. */ iterator find(const K& key) { - return this->promote(key); + return promote(key); } /** @@ -129,11 +129,11 @@ public: * properly. */ const_iterator cfind(const K& key) const { - auto it = this->_map.find(key); + auto it = _map.find(key); // TODO(SERVER-28890): Remove the function-style cast when MSVC's // `std::list< ... >::iterator` implementation doesn't conflict with their `/Zc:ternary` // flag support . - return (it == this->_map.end()) ? this->end() : const_iterator(it->second); + return (it == _map.end()) ? end() : const_iterator(it->second); } /** @@ -141,8 +141,8 @@ public: * to the least recently used element. */ iterator promote(const K& key) { - auto it = this->_map.find(key); - return (it == this->_map.end()) ? this->end() : this->promote(it->second); + auto it = _map.find(key); + return (it == _map.end()) ? end() : promote(it->second); } /** @@ -150,12 +150,12 @@ public: * recently used element in the cache. */ iterator promote(const iterator& iter) { - if (iter == this->_list.end()) { + if (iter == _list.end()) { return iter; } - this->_list.splice(this->_list.begin(), this->_list, iter); - return this->_list.begin(); + _list.splice(_list.begin(), _list, iter); + return _list.begin(); } /** @@ -163,12 +163,12 @@ public: * least recently used element in the cache. */ const_iterator promote(const const_iterator& iter) { - if (iter == this->_list.cend()) { + if (iter == _list.cend()) { return iter; } - this->_list.splice(this->_list.begin(), this->_list, iter); - return this->_list.begin(); + _list.splice(_list.begin(), _list, iter); + return _list.begin(); } /** @@ -176,13 +176,13 @@ public: * exists. Returns the count of elements erased. */ typename Map::size_type erase(const K& key) { - auto it = this->_map.find(key); - if (it == this->_map.end()) { + auto it = _map.find(key); + if (it == _map.end()) { return 0; } - this->_list.erase(it->second); - this->_map.erase(it); + _list.erase(it->second); + _map.erase(it); return 1; } @@ -192,17 +192,17 @@ public: * element, or the end iterator, if no such element exists. */ iterator erase(iterator it) { - invariant(it != this->_list.end()); - invariant(this->_map.erase(it->first) == 1); - return this->_list.erase(it); + invariant(it != _list.end()); + invariant(_map.erase(it->first) == 1); + return _list.erase(it); } /** * Removes all items from the cache. */ void clear() { - this->_map.clear(); - this->_list.clear(); + _map.clear(); + _list.clear(); } /** @@ -210,64 +210,64 @@ public: * Otherwise, returns false. */ bool hasKey(const K& key) const { - return _map.find(key) != this->_map.end(); + return _map.find(key) != _map.end(); } /** * Returns the number of elements currently in the cache. */ std::size_t size() const { - return this->_list.size(); + return _list.size(); } bool empty() const { - return this->_list.empty(); + return _list.empty(); } /** * Returns an iterator pointing to the most recently used element in the cache. */ iterator begin() { - return this->_list.begin(); + return _list.begin(); } /** * Returns an iterator pointing past the least recently used element in the cache. */ iterator end() { - return this->_list.end(); + return _list.end(); } /** * Returns a const_iterator pointing to the most recently used element in the cache. */ const_iterator begin() const { - return this->_list.begin(); + return _list.begin(); } /** * Returns a const_iterafor pointing past the least recently used element in the cache. */ const_iterator end() const { - return this->_list.end(); + return _list.end(); } /** * Returns a const_iterator pointing to the most recently used element in the cache. */ const_iterator cbegin() const { - return this->_list.cbegin(); + return _list.cbegin(); } /** * Returns a const_iterator pointing past the least recently used element in the cache. */ const_iterator cend() const { - return this->_list.cend(); + return _list.cend(); } typename Map::size_type count(const K& key) const { - return this->_map.count(key); + return _map.count(key); } private: |