summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/sharded_agg_helpers.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.h')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h118
1 files changed, 67 insertions, 51 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 15e0dd51c2e..b8c25a42510 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_aggregate.h"
#include "mongo/s/query/cluster_aggregation_planner.h"
namespace mongo {
@@ -67,63 +68,78 @@ struct DispatchShardPipelineResults {
boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
};
-Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
+/**
+ * This structure contains information for targeting an aggregation pipeline in a sharded cluster.
+ */
+struct AggregationTargeter {
+ /**
+ * Populates and returns targeting info for an aggregation pipeline on the given namespace
+ * 'executionNss'.
+ */
+ static StatusWith<AggregationTargeter> make(
+ OperationContext* opCtx,
+ const NamespaceString& executionNss,
+ const std::function<std::unique_ptr<Pipeline, PipelineDeleter>(
+ boost::optional<CachedCollectionRoutingInfo>)> buildPipelineFn,
+ stdx::unordered_set<NamespaceString> involvedNamespaces,
+ bool hasChangeStream,
+ bool allowedToPassthrough);
+
+ enum TargetingPolicy {
+ kPassthrough,
+ kMongosRequired,
+ kAnyShard,
+ } policy;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ boost::optional<CachedCollectionRoutingInfo> routingInfo;
+};
+
+Status runPipelineOnPrimaryShard(OperationContext* opCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const CachedDatabaseInfo& dbInfo,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* out);
-bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
+/**
+ * Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a
+ * pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS.
+ */
+Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,
+ long long batchSize,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObjBuilder* result,
+ const PrivilegeVector& privileges);
-StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
- const NamespaceString& execNss);
+/**
+ * Dispatches the pipeline in 'targeter' to the shards that are involved, and merges the results if
+ * necessary on either mongos or a randomly designated shard.
+ */
+Status dispatchPipelineAndMerge(OperationContext* opCtx,
+ sharded_agg_helpers::AggregationTargeter targeter,
+ Document serializedCommand,
+ long long batchSize,
+ const ClusterAggregate::Namespaces& namespaces,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* result,
+ bool hasChangeStream);
/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and the
- * pipeline that will need to be executed to merge the results from the remotes. If a stale shard
- * version is encountered, refreshes the routing table and tries again.
+ * Returns the "collation" and "uuid" for the collection given by "nss" with the following
+ * semantics:
+ * - The "collation" parameter will be set to the default collation for the collection or the
+ * simple collation if there is no default. If the collection does not exist or if the aggregate
+ * is on the collectionless namespace, this will be set to an empty object.
+ * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections
+ * output for unsharded collections. The UUID will remain unset if the aggregate is on the
+ * collectionless namespace.
*/
-DispatchShardPipelineResults dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj);
-
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation);
-
-std::vector<RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
+std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const std::set<ShardId>& shardIds,
- const BSONObj& cmdObj,
- const AggregationRequest& request,
- const ReadPreferenceSetting& readPref);
-
-BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const cluster_aggregation_planner::SplitPipeline& splitPipeline,
- const BSONObj collationObj,
- const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- const boost::optional<RuntimeConstants>& constants,
- bool needsMerge);
-
-BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<RuntimeConstants>& constants,
- Pipeline* pipeline,
- BSONObj collationObj);
-
-BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
- OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<RuntimeConstants>& constants,
- BSONObj collationObj);
+ const BSONObj& collation);
/**
* For a sharded collection, establishes remote cursors on each shard that may have results, and