diff options
author | Ian Boros <ian.boros@10gen.com> | 2019-01-03 16:02:22 -0500 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2019-01-29 12:20:53 -0500 |
commit | da3c2c2dfcf0fc680a4f49f8f29ab0671f345d61 (patch) | |
tree | 2fea8318b64d66fbd14847d01e94a3bc12886b01 /src/mongo/s/query/cluster_aggregate.cpp | |
parent | cf47aee946c42c246a9176e1df1cd27b12dde685 (diff) | |
download | mongo-da3c2c2dfcf0fc680a4f49f8f29ab0671f345d61.tar.gz |
SERVER-38728 allow pipeline with lookup stage on sharded collection to run on mongod
Diffstat (limited to 'src/mongo/s/query/cluster_aggregate.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 568f96021f4..d2071b01fc6 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/mongos_process_interface.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" @@ -140,13 +141,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, return appendAllowImplicitCreate(aggCmd, true); } -MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( +sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, const AggregationRequest& request, const LiteParsedPipeline& litePipe, BSONObj collationObj, - MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) { + sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) { invariant(!litePipe.hasChangeStream()); auto opCtx = expCtx->opCtx; @@ -183,7 +184,7 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards( + auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards( opCtx, request, litePipe, consumerPipelines.back(), collationObj, boost::none, false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], @@ -216,16 +217,16 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); mergeCursors->dismissCursorOwnership(); } - return MongoSInterface::DispatchShardPipelineResults{false, - std::move(ownedCursors), - {} /*TODO SERVER-36279*/, - std::move(splitPipeline), - nullptr, - BSONObj(), - numConsumers}; + return sharded_agg_helpers::DispatchShardPipelineResults{false, + std::move(ownedCursors), + {} /*TODO SERVER-36279*/, + std::move(splitPipeline), + nullptr, + BSONObj(), + numConsumers}; } -Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults, +Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, BSONObjBuilder* result) { if (dispatchResults.splitPipeline) { @@ -287,7 +288,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request); + Shard::RetryPolicy retryPolicy = sharded_agg_helpers::getDesiredRetryPolicy(request); return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy)); } @@ -599,13 +600,14 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx return getStatusFromCommandResult(result->asTempObj()); } -Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const ClusterAggregate::Namespaces& namespaces, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults, - BSONObjBuilder* result) { +Status dispatchMergingPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ClusterAggregate::Namespaces& namespaces, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults, + BSONObjBuilder* result) { // We should never be in a situation where we call this function on a non-merge pipeline. invariant(shardDispatchResults.splitPipeline); auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); @@ -689,7 +691,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, BSONObjBuilder* result) { uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec()); auto executionNsRoutingInfoStatus = - MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); + sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); boost::optional<CachedCollectionRoutingInfo> routingInfo; LiteParsedPipeline litePipe(request); const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) -> bool { @@ -717,7 +719,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Determine whether this aggregation must be dispatched to all shards in the cluster. const bool mustRunOnAll = - MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe); + sharded_agg_helpers::mustRunOnAllShards(namespaces.executionNss, litePipe); // If we don't have a routing table, then this is a $changeStream which must run on all shards. invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); @@ -768,7 +770,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( + auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, @@ -846,7 +848,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - MongoSInterface::createPassthroughCommandForShard( + sharded_agg_helpers::createPassthroughCommandForShard( opCtx, aggRequest, shardId, nullptr, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( |