summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregate.cpp
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2019-01-03 16:02:22 -0500
committerIan Boros <ian.boros@10gen.com>2019-01-29 12:20:53 -0500
commitda3c2c2dfcf0fc680a4f49f8f29ab0671f345d61 (patch)
tree2fea8318b64d66fbd14847d01e94a3bc12886b01 /src/mongo/s/query/cluster_aggregate.cpp
parentcf47aee946c42c246a9176e1df1cd27b12dde685 (diff)
downloadmongo-da3c2c2dfcf0fc680a4f49f8f29ab0671f345d61.tar.gz
SERVER-38728 allow pipeline with lookup stage on sharded collection to run on mongod
Diffstat (limited to 'src/mongo/s/query/cluster_aggregate.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp48
1 files changed, 25 insertions, 23 deletions
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 568f96021f4..d2071b01fc6 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find_common.h"
@@ -140,13 +141,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
return appendAllowImplicitCreate(aggCmd, true);
}
-MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
+sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
const AggregationRequest& request,
const LiteParsedPipeline& litePipe,
BSONObj collationObj,
- MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) {
+ sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) {
invariant(!litePipe.hasChangeStream());
auto opCtx = expCtx->opCtx;
@@ -183,7 +184,7 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
- auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards(
+ auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards(
opCtx, request, litePipe, consumerPipelines.back(), collationObj, boost::none, false);
requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
@@ -216,16 +217,16 @@ MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
mergeCursors->dismissCursorOwnership();
}
- return MongoSInterface::DispatchShardPipelineResults{false,
- std::move(ownedCursors),
- {} /*TODO SERVER-36279*/,
- std::move(splitPipeline),
- nullptr,
- BSONObj(),
- numConsumers};
+ return sharded_agg_helpers::DispatchShardPipelineResults{false,
+ std::move(ownedCursors),
+ {} /*TODO SERVER-36279*/,
+ std::move(splitPipeline),
+ nullptr,
+ BSONObj(),
+ numConsumers};
}
-Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults,
+Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
BSONObjBuilder* result) {
if (dispatchResults.splitPipeline) {
@@ -287,7 +288,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request);
+ Shard::RetryPolicy retryPolicy = sharded_agg_helpers::getDesiredRetryPolicy(request);
return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts(
opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy));
}
@@ -599,13 +600,14 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
return getStatusFromCommandResult(result->asTempObj());
}
-Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ClusterAggregate::Namespaces& namespaces,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults,
- BSONObjBuilder* result) {
+Status dispatchMergingPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults,
+ BSONObjBuilder* result) {
// We should never be in a situation where we call this function on a non-merge pipeline.
invariant(shardDispatchResults.splitPipeline);
auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
@@ -689,7 +691,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
BSONObjBuilder* result) {
uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec());
auto executionNsRoutingInfoStatus =
- MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
+ sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
boost::optional<CachedCollectionRoutingInfo> routingInfo;
LiteParsedPipeline litePipe(request);
const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) -> bool {
@@ -717,7 +719,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Determine whether this aggregation must be dispatched to all shards in the cluster.
const bool mustRunOnAll =
- MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe);
+ sharded_agg_helpers::mustRunOnAllShards(namespaces.executionNss, litePipe);
// If we don't have a routing table, then this is a $changeStream which must run on all shards.
invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
@@ -768,7 +770,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
+ auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
@@ -846,7 +848,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- MongoSInterface::createPassthroughCommandForShard(
+ sharded_agg_helpers::createPassthroughCommandForShard(
opCtx, aggRequest, shardId, nullptr, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(