diff options
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.h')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.h | 118 |
1 files changed, 67 insertions, 51 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 15e0dd51c2e..b8c25a42510 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_aggregate.h" #include "mongo/s/query/cluster_aggregation_planner.h" namespace mongo { @@ -67,63 +68,78 @@ struct DispatchShardPipelineResults { boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; }; -Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); +/** + * This structure contains information for targeting an aggregation pipeline in a sharded cluster. + */ +struct AggregationTargeter { + /** + * Populates and returns targeting info for an aggregation pipeline on the given namespace + * 'executionNss'. + */ + static StatusWith<AggregationTargeter> make( + OperationContext* opCtx, + const NamespaceString& executionNss, + const std::function<std::unique_ptr<Pipeline, PipelineDeleter>( + boost::optional<CachedCollectionRoutingInfo>)> buildPipelineFn, + stdx::unordered_set<NamespaceString> involvedNamespaces, + bool hasChangeStream, + bool allowedToPassthrough); + + enum TargetingPolicy { + kPassthrough, + kMongosRequired, + kAnyShard, + } policy; + + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; + boost::optional<CachedCollectionRoutingInfo> routingInfo; +}; + +Status runPipelineOnPrimaryShard(OperationContext* opCtx, + const ClusterAggregate::Namespaces& namespaces, + const CachedDatabaseInfo& dbInfo, + boost::optional<ExplainOptions::Verbosity> explain, + Document serializedCommand, + const PrivilegeVector& privileges, + BSONObjBuilder* out); -bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); +/** + * Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a + * pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS. + */ +Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, + long long batchSize, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObjBuilder* result, + const PrivilegeVector& privileges); -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss); +/** + * Dispatches the pipeline in 'targeter' to the shards that are involved, and merges the results if + * necessary on either mongos or a randomly designated shard. + */ +Status dispatchPipelineAndMerge(OperationContext* opCtx, + sharded_agg_helpers::AggregationTargeter targeter, + Document serializedCommand, + long long batchSize, + const ClusterAggregate::Namespaces& namespaces, + const PrivilegeVector& privileges, + BSONObjBuilder* result, + bool hasChangeStream); /** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and the - * pipeline that will need to be executed to merge the results from the remotes. If a stale shard - * version is encountered, refreshes the routing table and tries again. + * Returns the "collation" and "uuid" for the collection given by "nss" with the following + * semantics: + * - The "collation" parameter will be set to the default collation for the collection or the + * simple collation if there is no default. If the collection does not exist or if the aggregate + * is on the collectionless namespace, this will be set to an empty object. + * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections + * output for unsharded collections. The UUID will remain unset if the aggregate is on the + * collectionless namespace. */ -DispatchShardPipelineResults dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - BSONObj collationObj); - -std::set<ShardId> getTargetedShards(OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj shardQuery, - const BSONObj collation); - -std::vector<RemoteCursor> establishShardCursors( - OperationContext* opCtx, +std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID( + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const std::set<ShardId>& shardIds, - const BSONObj& cmdObj, - const AggregationRequest& request, - const ReadPreferenceSetting& readPref); - -BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const cluster_aggregation_planner::SplitPipeline& splitPipeline, - const BSONObj collationObj, - const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, - const boost::optional<RuntimeConstants>& constants, - bool needsMerge); - -BSONObj createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional<RuntimeConstants>& constants, - Pipeline* pipeline, - BSONObj collationObj); - -BSONObj genericTransformForShards(MutableDocument&& cmdForShards, - OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional<RuntimeConstants>& constants, - BSONObj collationObj); + const BSONObj& collation); /** * For a sharded collection, establishes remote cursors on each shard that may have results, and |