summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/sharded_agg_helpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index e305217ed2d..fedea813ff1 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -596,15 +596,18 @@ DispatchShardPipelineResults dispatchShardPipeline(
}
// Generate the command object for the targeted shards.
- BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(
- expCtx, serializedCommand, *splitPipeline, exchangeSpec, true)
- : createPassthroughCommandForShard(expCtx,
- serializedCommand,
- expCtx->explain,
- expCtx->getRuntimeConstants(),
- pipeline.get(),
- collationObj);
+ BSONObj targetedCommand = applyReadWriteConcern(
+ opCtx,
+ true, /* appendRC */
+ !expCtx->explain, /* appendWC */
+ splitPipeline ? createCommandForTargetedShards(
+ expCtx, serializedCommand, *splitPipeline, exchangeSpec, true)
+ : createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ expCtx->explain,
+ expCtx->getRuntimeConstants(),
+ pipeline.get(),
+ collationObj));
// A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
// config server in order to monitor for new shards. To guarantee that we do not miss any
@@ -1082,9 +1085,12 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
- BSONObj cmdObj =
+ BSONObj cmdObj = applyReadWriteConcern(
+ opCtx,
+ true, /* appendRC */
+ !explain, /* appendWC */
CommandHelpers::filterCommandRequestForPassthrough(createPassthroughCommandForShard(
- expCtx, serializedCommand, explain, boost::none, nullptr, BSONObj()));
+ expCtx, serializedCommand, explain, boost::none, nullptr, BSONObj())));
const auto shardId = dbInfo.primary()->getId();
const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId)