summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp91
1 files changed, 45 insertions, 46 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index d4aac818133..cbcb8b3c3ac 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -245,15 +245,15 @@ BSONObj createCommandForMergingShard(
return mergeCmd.freeze().toBson();
}
-StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>
-establishShardCursorsWithoutRetrying(OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
std::set<ShardId> shardIds =
@@ -306,7 +306,7 @@ establishShardCursorsWithoutRetrying(OperationContext* opCtx,
return swCursors;
}
-struct EstablishShardCursorsResults {
+struct DispatchShardPipelineResults {
// True if this pipeline was split, and the second half of the pipeline needs to be run on the
// primary shard for the database.
bool needsPrimaryShardMerge;
@@ -331,7 +331,7 @@ struct EstablishShardCursorsResults {
* the pipeline that will need to be executed to merge the results from the remotes. If a stale
* shard version is encountered, refreshes the routing table and tries again.
*/
-StatusWith<EstablishShardCursorsResults> establishShardCursors(
+StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
BSONObj originalCmdObj,
@@ -341,8 +341,7 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors(
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursorsWithoutRetrying to dispatch the aggregation to the targeted
- // shards.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
// - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
// the refreshed routing table data.
// - If the pipeline is not split and we now need to target multiple shards, split it. If the
@@ -431,14 +430,14 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors(
nullptr /* viewDefinition */);
}
} else {
- swCursors = establishShardCursorsWithoutRetrying(opCtx,
- executionNss,
- liteParsedPipeline,
- &executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
+ swCursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ &executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery,
+ aggRequest.getCollation());
if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
@@ -455,7 +454,7 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors(
if (!swCursors.isOK()) {
return swCursors.getStatus();
}
- return EstablishShardCursorsResults{needsPrimaryShardMerge,
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
std::move(swCursors.getValue()),
std::move(swShardResults.getValue()),
std::move(pipelineForTargetedShards),
@@ -662,32 +661,32 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return getStatusFromCommandResult(result->asTempObj());
}
- auto targetingResults = uassertStatusOK(establishShardCursors(mergeCtx,
- namespaces.executionNss,
- cmdObj,
- request,
- liteParsedPipeline,
- std::move(pipeline)));
+ auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx,
+ namespaces.executionNss,
+ cmdObj,
+ request,
+ liteParsedPipeline,
+ std::move(pipeline)));
if (mergeCtx->explain) {
// If we reach here, we've either succeeded in running the explain or exhausted all
// attempts. In either case, attempt to append the explain results to the output builder.
- uassertAllShardsSupportExplain(targetingResults.remoteExplainOutput);
+ uassertAllShardsSupportExplain(dispatchResults.remoteExplainOutput);
- return appendExplainResults(std::move(targetingResults.remoteExplainOutput),
+ return appendExplainResults(std::move(dispatchResults.remoteExplainOutput),
mergeCtx,
- targetingResults.pipelineForTargetedShards,
- targetingResults.pipelineForMerging,
+ dispatchResults.pipelineForTargetedShards,
+ dispatchResults.pipelineForMerging,
result);
}
- invariant(targetingResults.remoteCursors.size() > 0);
+ invariant(dispatchResults.remoteCursors.size() > 0);
// If we dispatched to a single shard, store the remote cursor and return immediately.
- if (!targetingResults.pipelineForTargetedShards->isSplitForShards()) {
- invariant(targetingResults.remoteCursors.size() == 1);
- const auto& remoteCursor = targetingResults.remoteCursors[0];
+ if (!dispatchResults.pipelineForTargetedShards->isSplitForShards()) {
+ invariant(dispatchResults.remoteCursors.size() == 1);
+ const auto& remoteCursor = dispatchResults.remoteCursors[0];
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
@@ -703,7 +702,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If we reach here, we have a merge pipeline to dispatch.
- auto mergingPipeline = std::move(targetingResults.pipelineForMerging);
+ auto mergingPipeline = std::move(dispatchResults.pipelineForMerging);
invariant(mergingPipeline);
// First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
@@ -716,7 +715,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
request,
namespaces.requestedNss,
std::move(mergingPipeline),
- std::move(targetingResults.remoteCursors));
+ std::move(dispatchResults.remoteCursors));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -726,16 +725,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// If we cannot merge on mongoS, establish the merge cursor on a shard.
mergingPipeline->addInitialSource(
- DocumentSourceMergeCursors::create(parseCursors(targetingResults.remoteCursors), mergeCtx));
+ DocumentSourceMergeCursors::create(parseCursors(dispatchResults.remoteCursors), mergeCtx));
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
- auto mergeResponse = uassertStatusOK(establishMergingShardCursor(
- opCtx,
- namespaces.executionNss,
- targetingResults.remoteCursors,
- mergeCmdObj,
- boost::optional<ShardId>{targetingResults.needsPrimaryShardMerge,
- executionNsRoutingInfo.primaryId()}));
+ auto mergeResponse = uassertStatusOK(
+ establishMergingShardCursor(opCtx,
+ namespaces.executionNss,
+ dispatchResults.remoteCursors,
+ mergeCmdObj,
+ boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge,
+ executionNsRoutingInfo.primaryId()}));
auto mergingShardId = mergeResponse.first;
auto response = mergeResponse.second;