summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_aggregate.cpp')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp43
1 files changed, 26 insertions, 17 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index acfe13c9494..b52beed565f 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -168,36 +168,44 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger();
const bool needSplit = !singleShard || needPrimaryShardMerger;
- // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true,
- // 'pipeline' will become the merger side.
- boost::intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline.getValue()->splitForSharded()
- : pipeline.getValue());
+ // Split the pipeline into pieces for mongod(s) and this mongos. It is illegal to use 'pipeline'
+ // after this point.
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards;
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard;
+ if (needSplit) {
+ pipelineForTargetedShards = pipeline.getValue()->splitForSharded();
+ pipelineForMergingShard = std::move(pipeline.getValue());
+ } else {
+ pipelineForTargetedShards = std::move(pipeline.getValue());
+ }
// Create the command for the shards. The 'fromRouter' field means produce output to be
// merged.
- MutableDocument commandBuilder(request.serializeToCommandObj());
- commandBuilder[AggregationRequest::kPipelineName] = Value(shardPipeline->serialize());
+ MutableDocument targetedCommandBuilder(request.serializeToCommandObj());
+ targetedCommandBuilder[AggregationRequest::kPipelineName] =
+ Value(pipelineForTargetedShards->serialize());
if (needSplit) {
- commandBuilder[AggregationRequest::kFromRouterName] = Value(true);
- commandBuilder[AggregationRequest::kCursorName] =
+ targetedCommandBuilder[AggregationRequest::kFromRouterName] = Value(true);
+ targetedCommandBuilder[AggregationRequest::kCursorName] =
Value(DOC(AggregationRequest::kBatchSizeName << 0));
}
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (mergeCtx->explain) {
- commandBuilder.reset(wrapAggAsExplain(commandBuilder.freeze(), *mergeCtx->explain));
+ targetedCommandBuilder.reset(
+ wrapAggAsExplain(targetedCommandBuilder.freeze(), *mergeCtx->explain));
}
- BSONObj shardedCommand = commandBuilder.freeze().toBson();
- BSONObj shardQuery = shardPipeline->getInitialQuery();
+ BSONObj targetedCommand = targetedCommandBuilder.freeze().toBson();
+ BSONObj shardQuery = pipelineForTargetedShards->getInitialQuery();
// Run the command on the shards
// TODO need to make sure cursors are killed if a retry is needed
std::vector<Strategy::CommandResult> shardResults;
Strategy::commandOp(opCtx,
namespaces.executionNss.db().toString(),
- shardedCommand,
+ targetedCommand,
namespaces.executionNss.ns(),
shardQuery,
request.getCollation(),
@@ -209,9 +217,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
if (needSplit) {
*result << "needsPrimaryShardMerger" << needPrimaryShardMerger << "splitPipeline"
- << Document{{"shardsPart", shardPipeline->writeExplainOps(*mergeCtx->explain)},
+ << Document{{"shardsPart",
+ pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)},
{"mergerPart",
- pipeline.getValue()->writeExplainOps(*mergeCtx->explain)}};
+ pipelineForMergingShard->writeExplainOps(*mergeCtx->explain)}};
} else {
*result << "splitPipeline" << BSONNULL;
}
@@ -241,11 +250,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return getStatusFromCommandResult(reply);
}
- pipeline.getValue()->addInitialSource(
+ pipelineForMergingShard->addInitialSource(
DocumentSourceMergeCursors::create(parseCursors(shardResults), mergeCtx));
MutableDocument mergeCmd(request.serializeToCommandObj());
- mergeCmd["pipeline"] = Value(pipeline.getValue()->serialize());
+ mergeCmd["pipeline"] = Value(pipelineForMergingShard->serialize());
mergeCmd["cursor"] = Value(cmdObj["cursor"]);
if (cmdObj.hasField(QueryRequest::cmdOptionMaxTimeMS)) {
@@ -267,7 +276,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
std::string outputNsOrEmpty;
if (DocumentSourceOut* out =
- dynamic_cast<DocumentSourceOut*>(pipeline.getValue()->getSources().back().get())) {
+ dynamic_cast<DocumentSourceOut*>(pipelineForMergingShard->getSources().back().get())) {
outputNsOrEmpty = out->getOutputNs().ns();
}