summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/sharded_agg_helpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp20
1 files changed, 15 insertions, 5 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index cb1f44f7092..d5bce605c9d 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -26,7 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -34,6 +34,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/query/cluster_query_knobs.h"
@@ -186,10 +187,15 @@ BSONObj createCommandForTargetedShards(
// TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream());
- // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
- // part. Otherwise this is part of an exchange and in that case we should include the
- // writeConcern.
- targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ // If there aren't any stages like $out in the pipeline being sent to the shards, remove the
+ // write concern. The write concern should only be applied when there are writes performed
+ // to avoid mistakenly waiting for writes which didn't happen.
+ const auto& shardsPipe = splitPipeline.shardsPipeline->getSources();
+ if (!std::any_of(shardsPipe.begin(), shardsPipe.end(), [](const auto& stage) {
+ return stage->constraints().writesPersistentData();
+ })) {
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ }
}
targetedCmd[AggregationRequest::kCursorName] =
@@ -261,6 +267,10 @@ DispatchShardPipelineResults dispatchShardPipeline(
boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
if (needsSplit) {
+ LOG(5) << "Splitting pipeline: "
+ << "targeting = " << shardIds.size()
+ << " shards, needsMongosMerge = " << needsMongosMerge
+ << ", needsPrimaryShardMerge = " << needsPrimaryShardMerge;
splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(