diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-11-29 09:56:42 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-02-13 07:30:10 -0500 |
commit | 87194fbe0c24525bc1f2d674012fe6978eca77d2 (patch) | |
tree | 0f13b14046152f0b475ae16d9a95b5e2ba0c4cbb /src/mongo/db/pipeline/sharded_agg_helpers.cpp | |
parent | 69f26fa3798b0d7927858aa704243cdac676c6e9 (diff) | |
download | mongo-87194fbe0c24525bc1f2d674012fe6978eca77d2.tar.gz |
SERVER-38311 Change out merging strategy
Allows an $out stage to run in parallel on all shards if the target
collection is sharded and so is the input collection to the aggregate.
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index cb1f44f7092..d5bce605c9d 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -26,7 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -34,6 +34,7 @@ #include "mongo/db/curop.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_out.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/cluster_query_knobs.h" @@ -186,10 +187,15 @@ BSONObj createCommandForTargetedShards( // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream()); - // For split pipelines which need merging, do *not* propagate the writeConcern to the shards - // part. Otherwise this is part of an exchange and in that case we should include the - // writeConcern. - targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + // If there aren't any stages like $out in the pipeline being sent to the shards, remove the + // write concern. The write concern should only be applied when there are writes performed + // to avoid mistakenly waiting for writes which didn't happen. + const auto& shardsPipe = splitPipeline.shardsPipeline->getSources(); + if (!std::any_of(shardsPipe.begin(), shardsPipe.end(), [](const auto& stage) { + return stage->constraints().writesPersistentData(); + })) { + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + } } targetedCmd[AggregationRequest::kCursorName] = @@ -261,6 +267,10 @@ DispatchShardPipelineResults dispatchShardPipeline( boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; if (needsSplit) { + LOG(5) << "Splitting pipeline: " + << "targeting = " << shardIds.size() + << " shards, needsMongosMerge = " << needsMongosMerge + << ", needsPrimaryShardMerge = " << needsPrimaryShardMerge; splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( |