summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/sharded_agg_helpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp38
1 files changed, 22 insertions, 16 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 9c5ea37cbcb..3eaeb1666c8 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -26,11 +26,12 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
-#include "sharded_agg_helpers.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
@@ -58,11 +59,13 @@
#include "mongo/s/query/cluster_query_knobs_gen.h"
#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/router.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/visit_helper.h"
-namespace mongo::sharded_agg_helpers {
+namespace mongo {
+namespace sharded_agg_helpers {
MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors);
@@ -613,7 +616,6 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) {
}
}
-
} // namespace
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
@@ -1349,19 +1351,22 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
ns == NamespaceString::kChangeStreamPreImagesNamespace);
};
- auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
- return shardVersionRetry(
- expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() {
- auto pipelineToTarget = pipeline->clone();
+ if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
+ shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) {
+ auto pipelineToTarget = pipeline->clone();
- if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
- shouldAlwaysAttachLocalCursorForNamespace(expCtx->ns)) {
- return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
- pipelineToTarget.release());
- }
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ pipelineToTarget.release());
+ }
+
+ sharding::router::CollectionRouter router(expCtx->opCtx->getServiceContext(), expCtx->ns);
+ return router.route(
+ expCtx->opCtx,
+ "targeting pipeline to attach cursors"_sd,
+ [&](OperationContext* opCtx, const ChunkManager& cm) {
+ auto pipelineToTarget = pipeline->clone();
- auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns);
- if (cm.isOK() && !cm.getValue().isSharded()) {
+ if (!cm.isSharded()) {
// If the collection is unsharded and we are on the primary, we should be able to
// do a local read. The primary may be moved right after the primary shard check,
// but the local read path will do a db version check before it establishes a cursor
@@ -1371,7 +1376,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
expCtx->mongoProcessInterface->setExpectedShardVersion(
expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED());
setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion(
- expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion());
+ expCtx->opCtx, expCtx->ns, cm.dbVersion());
// During 'attachCursorSourceToPipelineForLocalRead', the expected db version
// must be set. Whether or not that call is succesful, to avoid affecting later
@@ -1424,4 +1429,5 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
});
}
-} // namespace mongo::sharded_agg_helpers
+} // namespace sharded_agg_helpers
+} // namespace mongo