diff options
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 38 |
1 files changed, 22 insertions, 16 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 9c5ea37cbcb..3eaeb1666c8 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -26,11 +26,12 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" -#include "sharded_agg_helpers.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/curop.h" #include "mongo/db/pipeline/aggregate_command_gen.h" @@ -58,11 +59,13 @@ #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" +#include "mongo/s/router.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" #include "mongo/util/visit_helper.h" -namespace mongo::sharded_agg_helpers { +namespace mongo { +namespace sharded_agg_helpers { MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors); @@ -613,7 +616,6 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { } } - } // namespace std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( @@ -1349,19 +1351,22 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( ns == NamespaceString::kChangeStreamPreImagesNamespace); }; - auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); - return shardVersionRetry( - expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() { - auto pipelineToTarget = pipeline->clone(); + if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || + shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) { + auto pipelineToTarget = pipeline->clone(); - if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || - shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) { - return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( - pipelineToTarget.release()); - } + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + pipelineToTarget.release()); + } + + sharding::router::CollectionRouter router(expCtx->opCtx->getServiceContext(), expCtx->ns); + return router.route( + expCtx->opCtx, + "targeting pipeline to attach cursors"_sd, + [&](OperationContext* opCtx, const ChunkManager& cm) { + auto pipelineToTarget = pipeline->clone(); - auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); - if (cm.isOK() && !cm.getValue().isSharded()) { + if (!cm.isSharded()) { // If the collection is unsharded and we are on the primary, we should be able to // do a local read. The primary may be moved right after the primary shard check, // but the local read path will do a db version check before it establishes a cursor @@ -1371,7 +1376,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( expCtx->mongoProcessInterface->setExpectedShardVersion( expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED()); setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion( - expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion()); + expCtx->opCtx, expCtx->ns, cm.dbVersion()); // During 'attachCursorSourceToPipelineForLocalRead', the expected db version // must be set. Whether or not that call is succesful, to avoid affecting later @@ -1424,4 +1429,5 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( }); } -} // namespace mongo::sharded_agg_helpers +} // namespace sharded_agg_helpers +} // namespace mongo |