diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-22 05:30:45 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-26 04:28:41 +0100 |
commit | 253d4c559450238d786e03d3c3118f02e6b9358f (patch) | |
tree | 8cd0a5539c93ae8fb28676c63cf9d6b339d71358 | |
parent | 81c5724d9d11a1ff2b16937dc01e6439d9e0f266 (diff) | |
download | mongo-253d4c559450238d786e03d3c3118f02e6b9358f.tar.gz |
SERVER-31674 Rename misleading functions and structs in cluster_aggregate.cpp
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 91 |
1 files changed, 45 insertions, 46 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index d4aac818133..cbcb8b3c3ac 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -245,15 +245,15 @@ BSONObj createCommandForMergingShard( return mergeCmd.freeze().toBson(); } -StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> -establishShardCursorsWithoutRetrying(OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { +StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; std::set<ShardId> shardIds = @@ -306,7 +306,7 @@ establishShardCursorsWithoutRetrying(OperationContext* opCtx, return swCursors; } -struct EstablishShardCursorsResults { +struct DispatchShardPipelineResults { // True if this pipeline was split, and the second half of the pipeline needs to be run on the // primary shard for the database. bool needsPrimaryShardMerge; @@ -331,7 +331,7 @@ struct EstablishShardCursorsResults { * the pipeline that will need to be executed to merge the results from the remotes. If a stale * shard version is encountered, refreshes the routing table and tries again. */ -StatusWith<EstablishShardCursorsResults> establishShardCursors( +StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, BSONObj originalCmdObj, @@ -341,8 +341,7 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors( // The process is as follows: // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursorsWithoutRetrying to dispatch the aggregation to the targeted - // shards. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with // the refreshed routing table data. // - If the pipeline is not split and we now need to target multiple shards, split it. If the @@ -431,14 +430,14 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors( nullptr /* viewDefinition */); } } else { - swCursors = establishShardCursorsWithoutRetrying(opCtx, - executionNss, - liteParsedPipeline, - &executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); + swCursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + &executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + shardQuery, + aggRequest.getCollation()); if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { LOG(1) << "got stale shardVersion error " << swCursors.getStatus() @@ -455,7 +454,7 @@ StatusWith<EstablishShardCursorsResults> establishShardCursors( if (!swCursors.isOK()) { return swCursors.getStatus(); } - return EstablishShardCursorsResults{needsPrimaryShardMerge, + return DispatchShardPipelineResults{needsPrimaryShardMerge, std::move(swCursors.getValue()), std::move(swShardResults.getValue()), std::move(pipelineForTargetedShards), @@ -662,32 +661,32 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return getStatusFromCommandResult(result->asTempObj()); } - auto targetingResults = uassertStatusOK(establishShardCursors(mergeCtx, - namespaces.executionNss, - cmdObj, - request, - liteParsedPipeline, - std::move(pipeline))); + auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx, + namespaces.executionNss, + cmdObj, + request, + liteParsedPipeline, + std::move(pipeline))); if (mergeCtx->explain) { // If we reach here, we've either succeeded in running the explain or exhausted all // attempts. In either case, attempt to append the explain results to the output builder. - uassertAllShardsSupportExplain(targetingResults.remoteExplainOutput); + uassertAllShardsSupportExplain(dispatchResults.remoteExplainOutput); - return appendExplainResults(std::move(targetingResults.remoteExplainOutput), + return appendExplainResults(std::move(dispatchResults.remoteExplainOutput), mergeCtx, - targetingResults.pipelineForTargetedShards, - targetingResults.pipelineForMerging, + dispatchResults.pipelineForTargetedShards, + dispatchResults.pipelineForMerging, result); } - invariant(targetingResults.remoteCursors.size() > 0); + invariant(dispatchResults.remoteCursors.size() > 0); // If we dispatched to a single shard, store the remote cursor and return immediately. - if (!targetingResults.pipelineForTargetedShards->isSplitForShards()) { - invariant(targetingResults.remoteCursors.size() == 1); - const auto& remoteCursor = targetingResults.remoteCursors[0]; + if (!dispatchResults.pipelineForTargetedShards->isSplitForShards()) { + invariant(dispatchResults.remoteCursors.size() == 1); + const auto& remoteCursor = dispatchResults.remoteCursors[0]; auto executorPool = Grid::get(opCtx)->getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor( opCtx, @@ -703,7 +702,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If we reach here, we have a merge pipeline to dispatch. - auto mergingPipeline = std::move(targetingResults.pipelineForMerging); + auto mergingPipeline = std::move(dispatchResults.pipelineForMerging); invariant(mergingPipeline); // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, @@ -716,7 +715,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, request, namespaces.requestedNss, std::move(mergingPipeline), - std::move(targetingResults.remoteCursors)); + std::move(dispatchResults.remoteCursors)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -726,16 +725,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If we cannot merge on mongoS, establish the merge cursor on a shard. mergingPipeline->addInitialSource( - DocumentSourceMergeCursors::create(parseCursors(targetingResults.remoteCursors), mergeCtx)); + DocumentSourceMergeCursors::create(parseCursors(dispatchResults.remoteCursors), mergeCtx)); auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); - auto mergeResponse = uassertStatusOK(establishMergingShardCursor( - opCtx, - namespaces.executionNss, - targetingResults.remoteCursors, - mergeCmdObj, - boost::optional<ShardId>{targetingResults.needsPrimaryShardMerge, - executionNsRoutingInfo.primaryId()})); + auto mergeResponse = uassertStatusOK( + establishMergingShardCursor(opCtx, + namespaces.executionNss, + dispatchResults.remoteCursors, + mergeCmdObj, + boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge, + executionNsRoutingInfo.primaryId()})); auto mergingShardId = mergeResponse.first; auto response = mergeResponse.second; |