summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp40
1 files changed, 20 insertions, 20 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 2654463431a..7e7577dbdb3 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -102,8 +102,8 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
Status appendExplainResults(
const std::vector<AsyncRequestsSender::Response>& shardResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging,
BSONObjBuilder* result) {
if (pipelineForTargetedShards->isSplitForShards()) {
*result << "mergeType"
@@ -202,7 +202,7 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx,
BSONObj createCommandForTargetedShards(
const AggregationRequest& request,
const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards) {
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) {
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
@@ -237,7 +237,7 @@ BSONObj createCommandForMergingShard(
const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging) {
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
@@ -338,10 +338,10 @@ struct DispatchShardPipelineResults {
// The half of the pipeline that was sent to each shard, or the entire pipeline if there was
// only one shard targeted.
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
// The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
// The command object to send to the targeted shards.
BSONObj commandForTargetedShards;
@@ -358,7 +358,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
@@ -381,7 +381,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
const auto shardQuery = pipeline->getInitialQuery();
auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
BSONObj targetedCommand;
int numAttempts = 0;
@@ -520,17 +520,13 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs
return {{std::move(mergingShardId), std::move(shardCmdResponse)}};
}
-BSONObj establishMergingMongosCursor(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
-
- // Inject the MongosProcessInterface for sources which need it.
- PipelineS::injectMongosInterface(pipelineForMerging.get());
+BSONObj establishMergingMongosCursor(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
ClusterClientCursorParams params(
requestedNss,
@@ -732,7 +728,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
boost::intrusive_ptr<ExpressionContext> mergeCtx =
- new ExpressionContext(opCtx, request, std::move(collation), std::move(resolvedNamespaces));
+ new ExpressionContext(opCtx,
+ request,
+ std::move(collation),
+ std::make_shared<PipelineS::MongoSProcessInterface>(),
+ std::move(resolvedNamespaces));
mergeCtx->inMongos = true;
// explicitly *not* setting mergeCtx->tempDir