diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 2654463431a..7e7577dbdb3 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -102,8 +102,8 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v Status appendExplainResults( const std::vector<AsyncRequestsSender::Response>& shardResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging, BSONObjBuilder* result) { if (pipelineForTargetedShards->isSplitForShards()) { *result << "mergeType" @@ -202,7 +202,7 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx, BSONObj createCommandForTargetedShards( const AggregationRequest& request, const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards) { + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) { // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); targetedCmd[AggregationRequest::kFromMongosName] = Value(true); @@ -237,7 +237,7 @@ BSONObj createCommandForMergingShard( const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging) { + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); @@ -338,10 +338,10 @@ struct DispatchShardPipelineResults { // The half of the pipeline that was sent to each shard, or the entire pipeline if there was // only one shard targeted. - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards; + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; // The command object to send to the targeted shards. BSONObj commandForTargetedShards; @@ -358,7 +358,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { // The process is as follows: // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. @@ -381,7 +381,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; BSONObj targetedCommand; int numAttempts = 0; @@ -520,17 +520,13 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs return {{std::move(mergingShardId), std::move(shardCmdResponse)}}; } -BSONObj establishMergingMongosCursor( - OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { - - // Inject the MongosProcessInterface for sources which need it. - PipelineS::injectMongosInterface(pipelineForMerging.get()); +BSONObj establishMergingMongosCursor(OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, + std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { ClusterClientCursorParams params( requestedNss, @@ -732,7 +728,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } boost::intrusive_ptr<ExpressionContext> mergeCtx = - new ExpressionContext(opCtx, request, std::move(collation), std::move(resolvedNamespaces)); + new ExpressionContext(opCtx, + request, + std::move(collation), + std::make_shared<PipelineS::MongoSProcessInterface>(), + std::move(resolvedNamespaces)); mergeCtx->inMongos = true; // explicitly *not* setting mergeCtx->tempDir |