diff options
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 43 |
1 files changed, 26 insertions, 17 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index acfe13c9494..b52beed565f 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -168,36 +168,44 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger(); const bool needSplit = !singleShard || needPrimaryShardMerger; - // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true, - // 'pipeline' will become the merger side. - boost::intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline.getValue()->splitForSharded() - : pipeline.getValue()); + // Split the pipeline into pieces for mongod(s) and this mongos. It is illegal to use 'pipeline' + // after this point. + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards; + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard; + if (needSplit) { + pipelineForTargetedShards = pipeline.getValue()->splitForSharded(); + pipelineForMergingShard = std::move(pipeline.getValue()); + } else { + pipelineForTargetedShards = std::move(pipeline.getValue()); + } // Create the command for the shards. The 'fromRouter' field means produce output to be // merged. - MutableDocument commandBuilder(request.serializeToCommandObj()); - commandBuilder[AggregationRequest::kPipelineName] = Value(shardPipeline->serialize()); + MutableDocument targetedCommandBuilder(request.serializeToCommandObj()); + targetedCommandBuilder[AggregationRequest::kPipelineName] = + Value(pipelineForTargetedShards->serialize()); if (needSplit) { - commandBuilder[AggregationRequest::kFromRouterName] = Value(true); - commandBuilder[AggregationRequest::kCursorName] = + targetedCommandBuilder[AggregationRequest::kFromRouterName] = Value(true); + targetedCommandBuilder[AggregationRequest::kCursorName] = Value(DOC(AggregationRequest::kBatchSizeName << 0)); } // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (mergeCtx->explain) { - commandBuilder.reset(wrapAggAsExplain(commandBuilder.freeze(), *mergeCtx->explain)); + targetedCommandBuilder.reset( + wrapAggAsExplain(targetedCommandBuilder.freeze(), *mergeCtx->explain)); } - BSONObj shardedCommand = commandBuilder.freeze().toBson(); - BSONObj shardQuery = shardPipeline->getInitialQuery(); + BSONObj targetedCommand = targetedCommandBuilder.freeze().toBson(); + BSONObj shardQuery = pipelineForTargetedShards->getInitialQuery(); // Run the command on the shards // TODO need to make sure cursors are killed if a retry is needed std::vector<Strategy::CommandResult> shardResults; Strategy::commandOp(opCtx, namespaces.executionNss.db().toString(), - shardedCommand, + targetedCommand, namespaces.executionNss.ns(), shardQuery, request.getCollation(), @@ -209,9 +217,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, if (needSplit) { *result << "needsPrimaryShardMerger" << needPrimaryShardMerger << "splitPipeline" - << Document{{"shardsPart", shardPipeline->writeExplainOps(*mergeCtx->explain)}, + << Document{{"shardsPart", + pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)}, {"mergerPart", - pipeline.getValue()->writeExplainOps(*mergeCtx->explain)}}; + pipelineForMergingShard->writeExplainOps(*mergeCtx->explain)}}; } else { *result << "splitPipeline" << BSONNULL; } @@ -241,11 +250,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return getStatusFromCommandResult(reply); } - pipeline.getValue()->addInitialSource( + pipelineForMergingShard->addInitialSource( DocumentSourceMergeCursors::create(parseCursors(shardResults), mergeCtx)); MutableDocument mergeCmd(request.serializeToCommandObj()); - mergeCmd["pipeline"] = Value(pipeline.getValue()->serialize()); + mergeCmd["pipeline"] = Value(pipelineForMergingShard->serialize()); mergeCmd["cursor"] = Value(cmdObj["cursor"]); if (cmdObj.hasField(QueryRequest::cmdOptionMaxTimeMS)) { @@ -267,7 +276,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, std::string outputNsOrEmpty; if (DocumentSourceOut* out = - dynamic_cast<DocumentSourceOut*>(pipeline.getValue()->getSources().back().get())) { + dynamic_cast<DocumentSourceOut*>(pipelineForMergingShard->getSources().back().get())) { outputNsOrEmpty = out->getOutputNs().ns(); } |