diff options
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e305217ed2d..fedea813ff1 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -596,15 +596,18 @@ DispatchShardPipelineResults dispatchShardPipeline( } // Generate the command object for the targeted shards. - BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards( - expCtx, serializedCommand, *splitPipeline, exchangeSpec, true) - : createPassthroughCommandForShard(expCtx, - serializedCommand, - expCtx->explain, - expCtx->getRuntimeConstants(), - pipeline.get(), - collationObj); + BSONObj targetedCommand = applyReadWriteConcern( + opCtx, + true, /* appendRC */ + !expCtx->explain, /* appendWC */ + splitPipeline ? createCommandForTargetedShards( + expCtx, serializedCommand, *splitPipeline, exchangeSpec, true) + : createPassthroughCommandForShard(expCtx, + serializedCommand, + expCtx->explain, + expCtx->getRuntimeConstants(), + pipeline.get(), + collationObj)); // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the // config server in order to monitor for new shards. To guarantee that we do not miss any @@ -1082,9 +1085,12 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. - BSONObj cmdObj = + BSONObj cmdObj = applyReadWriteConcern( + opCtx, + true, /* appendRC */ + !explain, /* appendWC */ CommandHelpers::filterCommandRequestForPassthrough(createPassthroughCommandForShard( - expCtx, serializedCommand, explain, boost::none, nullptr, BSONObj())); + expCtx, serializedCommand, explain, boost::none, nullptr, BSONObj()))); const auto shardId = dbInfo.primary()->getId(); const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId) |