summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-25 03:37:30 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-01-09 02:59:49 +0000
commit16c7973fc8037ee0b7bd12116e5ff25084af142b (patch)
treea357dc1ba14d29dea7e9012907dabe6d1ad75946 /src/mongo/db
parent214bf238fedc4e147e6473f5fc64428987added6 (diff)
downloadmongo-16c7973fc8037ee0b7bd12116e5ff25084af142b.tar.gz
SERVER-38410 Allow ARM to consume postBatchResumeToken
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp6
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp10
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h22
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp19
-rw-r--r--src/mongo/db/pipeline/change_stream_constants.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp75
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.cpp15
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp2
-rw-r--r--src/mongo/db/pipeline/expression_context.h1
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp16
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h3
13 files changed, 168 insertions, 26 deletions
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
index 5af30b4b5a3..3fc83586e37 100644
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ b/src/mongo/db/exec/change_stream_proxy.cpp
@@ -51,12 +51,10 @@ ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
if (auto next = _pipeline->getNext()) {
// While we have more results to return, we track both the timestamp and the resume token of
- // the latest event observed in the oplog, the latter via its _id field.
+ // the latest event observed in the oplog, the latter via its sort key metadata field.
auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson());
_latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
- if (next->getField("_id").getType() == BSONType::Object) {
- _postBatchResumeToken = next->getField("_id").getDocument().toBson();
- }
+ _postBatchResumeToken = next->getSortKeyMetaField();
return nextBSON;
}
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 9a45b38ec17..e1fb48d7634 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -54,6 +54,7 @@ constexpr StringData AggregationRequest::kCursorName;
constexpr StringData AggregationRequest::kBatchSizeName;
constexpr StringData AggregationRequest::kFromMongosName;
constexpr StringData AggregationRequest::kNeedsMergeName;
+constexpr StringData AggregationRequest::kMergeByPBRTName;
constexpr StringData AggregationRequest::kPipelineName;
constexpr StringData AggregationRequest::kCollationName;
constexpr StringData AggregationRequest::kExplainName;
@@ -204,6 +205,14 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
hasNeedsMergeElem = true;
request.setNeedsMerge(elem.Bool());
+ } else if (kMergeByPBRTName == fieldName) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kMergeByPBRTName << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+
+ request.setMergeByPBRT(elem.Bool());
} else if (kAllowDiskUseName == fieldName) {
if (storageGlobalParams.readOnly) {
return {ErrorCodes::IllegalOperation,
@@ -313,6 +322,7 @@ Document AggregationRequest::serializeToCommandObj() const {
{kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
{kFromMongosName, _fromMongos ? Value(true) : Value()},
{kNeedsMergeName, _needsMerge ? Value(true) : Value()},
+ {kMergeByPBRTName, _mergeByPBRT ? Value(true) : Value()},
{bypassDocumentValidationCommandOption(),
_bypassDocumentValidation ? Value(true) : Value()},
// Only serialize a collation if one was specified.
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 22134763ac9..cac78b57050 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -56,6 +56,7 @@ public:
static constexpr StringData kBatchSizeName = "batchSize"_sd;
static constexpr StringData kFromMongosName = "fromMongos"_sd;
static constexpr StringData kNeedsMergeName = "needsMerge"_sd;
+ static constexpr StringData kMergeByPBRTName = "mergeByPBRT"_sd;
static constexpr StringData kPipelineName = "pipeline"_sd;
static constexpr StringData kCollationName = "collation"_sd;
static constexpr StringData kExplainName = "explain"_sd;
@@ -152,6 +153,22 @@ public:
return _needsMerge;
}
+ /**
+ * Returns true if this request is a change stream pipeline which originated from a mongoS that
+ * can merge based on the documents' raw resume tokens and the 'postBatchResumeToken' field. If
+ * not, then the mongoD will need to produce the old {ts, uuid, docKey} $sortKey format instead.
+ * TODO SERVER-38539: this flag is no longer necessary in 4.4, since all change streams will be
+ * merged using raw resume tokens and PBRTs. This mechanism was chosen over FCV for two reasons:
+ * first, because this code is intended for backport to 4.0, where the same issue exists but FCV
+ * cannot be leveraged. Secondly, FCV can be changed at any point during runtime, but mongoS
+ * cannot dynamically switch from one $sortKey format to another mid-stream. The 'mergeByPBRT'
+ * flag allows the mongoS to dictate which $sortKey format will be used, and it will stay
+ * consistent for the entire duration of the stream.
+ */
+ bool mergeByPBRT() const {
+ return _mergeByPBRT;
+ }
+
bool shouldAllowDiskUse() const {
return _allowDiskUse;
}
@@ -239,6 +256,10 @@ public:
_needsMerge = needsMerge;
}
+ void setMergeByPBRT(bool mergeByPBRT) {
+ _mergeByPBRT = mergeByPBRT;
+ }
+
void setBypassDocumentValidation(bool shouldBypassDocumentValidation) {
_bypassDocumentValidation = shouldBypassDocumentValidation;
}
@@ -299,6 +320,7 @@ private:
bool _allowDiskUse = false;
bool _fromMongos = false;
bool _needsMerge = false;
+ bool _mergeByPBRT = false;
bool _bypassDocumentValidation = false;
// A user-specified maxTimeMS limit, or a value of '0' if not specified.
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index 502319d12dc..7b6e90008d5 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -59,15 +59,16 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
NamespaceString nss("a.collection");
const BSONObj inputBson = fromjson(
"{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, "
- "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: "
- "{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, "
- "$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', exchange: {policy: "
- "'roundrobin', consumers:NumberInt(2)}}");
+ "needsMerge: true, mergeByPBRT: true, bypassDocumentValidation: true, collation: {locale: "
+ "'en_US'}, cursor: {batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: "
+ "'linearizable'}, $queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', "
+ "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}}");
auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
ASSERT_FALSE(request.getExplain());
ASSERT_TRUE(request.shouldAllowDiskUse());
ASSERT_TRUE(request.isFromMongos());
ASSERT_TRUE(request.needsMerge());
+ ASSERT_TRUE(request.mergeByPBRT());
ASSERT_TRUE(request.shouldBypassDocumentValidation());
ASSERT_EQ(request.getBatchSize(), 10);
ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1));
@@ -155,6 +156,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau
request.setAllowDiskUse(false);
request.setFromMongos(false);
request.setNeedsMerge(false);
+ request.setMergeByPBRT(false);
request.setBypassDocumentValidation(false);
request.setCollation(BSONObj());
request.setHint(BSONObj());
@@ -176,6 +178,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
request.setAllowDiskUse(true);
request.setFromMongos(true);
request.setNeedsMerge(true);
+ request.setMergeByPBRT(true);
request.setBypassDocumentValidation(true);
request.setBatchSize(10);
request.setMaxTimeMS(10u);
@@ -199,6 +202,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
{AggregationRequest::kAllowDiskUseName, true},
{AggregationRequest::kFromMongosName, true},
{AggregationRequest::kNeedsMergeName, true},
+ {AggregationRequest::kMergeByPBRTName, true},
{bypassDocumentValidationCommandOption(), true},
{AggregationRequest::kCollationName, collationObj},
{AggregationRequest::kCursorName,
@@ -377,6 +381,13 @@ TEST(AggregationRequestTest, ShouldRejectFromMongosIfNeedsMerge34AlsoPresent) {
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
+TEST(AggregationRequestTest, ShouldRejectNonBoolMergeByPBRT) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson(
+ "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, mergeByPBRT: 1, fromMongos: true}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) {
NamespaceString nss("a.collection");
const BSONObj inputBson =
diff --git a/src/mongo/db/pipeline/change_stream_constants.h b/src/mongo/db/pipeline/change_stream_constants.h
index 58488054518..71d1c3f2305 100644
--- a/src/mongo/db/pipeline/change_stream_constants.h
+++ b/src/mongo/db/pipeline/change_stream_constants.h
@@ -36,8 +36,7 @@
namespace mongo {
namespace change_stream_constants {
-const BSONObj kSortSpec =
- BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1);
+const BSONObj kSortSpec = BSON("_id._data" << 1);
} // namespace change_stream_constants
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 418bd7ee8a6..1fffad0e43b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1225,6 +1225,81 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
ErrorCodes::InvalidResumeToken);
}
+TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) {
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("x" << 2 << "_id" << 1), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ boost::none); // o2
+
+ auto stages = makeStages(insert.toBSON(), kDefaultSpec);
+
+ getExpCtx()->mongoProcessInterface =
+ std::make_unique<MockMongoInterface>(std::vector<FieldPath>{{"x"}, {"_id"}});
+
+ getExpCtx()->mergeByPBRT = false;
+ getExpCtx()->needsMerge = false;
+
+ auto next = stages.back()->getNext();
+
+ auto expectedSortKey =
+ makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)).toBson();
+
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_BSONOBJ_EQ(next.releaseDocument().getSortKeyMetaField(), expectedSortKey);
+}
+
+TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfMergeByPBRTIsTrue) {
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("x" << 2 << "_id" << 1), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ boost::none); // o2
+
+ auto stages = makeStages(insert.toBSON(), kDefaultSpec);
+
+ getExpCtx()->mongoProcessInterface =
+ std::make_unique<MockMongoInterface>(std::vector<FieldPath>{{"x"}, {"_id"}});
+
+ getExpCtx()->mergeByPBRT = true;
+ getExpCtx()->needsMerge = true;
+
+ auto next = stages.back()->getNext();
+
+ auto expectedSortKey =
+ makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)).toBson();
+
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_BSONOBJ_EQ(next.releaseDocument().getSortKeyMetaField(), expectedSortKey);
+}
+
+TEST_F(ChangeStreamStageTest, UsesOldSortKeyFormatIfMergeByPBRTIsFalse) {
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("x" << 2 << "_id" << 1), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ boost::none); // o2
+
+ auto stages = makeStages(insert.toBSON(), kDefaultSpec);
+
+ getExpCtx()->mongoProcessInterface =
+ std::make_unique<MockMongoInterface>(std::vector<FieldPath>{{"x"}, {"_id"}});
+
+ getExpCtx()->mergeByPBRT = false;
+ getExpCtx()->needsMerge = true;
+
+ auto next = stages.back()->getNext();
+
+ auto expectedSortKey =
+ BSON("" << kDefaultTs << "" << testUuid() << "" << BSON("x" << 2 << "_id" << 1));
+
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_BSONOBJ_EQ(next.releaseDocument().getSortKeyMetaField(), expectedSortKey);
+}
+
//
// Test class for change stream of a single database.
//
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 9467efad0dd..08c4b5f7772 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -355,7 +355,8 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear
// in the output.
- ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey);
+ auto resumeTokenData = getResumeToken(ts, uuid, documentKey);
+ auto resumeToken = ResumeToken(resumeTokenData).toDocument();
// Add some additional fields only relevant to transactions.
if (_txnContext) {
@@ -364,15 +365,20 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid));
}
- doc.addField(DocumentSourceChangeStream::kIdField,
- Value(ResumeToken(resumeTokenData).toDocument()));
+ doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken));
doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType));
doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime));
- // If we're in a sharded environment, we'll need to merge the results by their sort key, so add
- // that as metadata.
- if (pExpCtx->needsMerge) {
+ // We set the resume token as the document's sort key in both the sharded and non-sharded cases,
+ // since we will subsequently rely upon it to generate a correct postBatchResumeToken.
+ // TODO SERVER-38539: when returning results for merging, we first check whether 'mergeByPBRT'
+ // has been set. If not, then the request was sent from an older mongoS which cannot merge by
+ // raw resume tokens, and we must use the old sort key format. This check, and the 'mergeByPBRT'
+ // flag, are no longer necessary in 4.4; all change streams will be merged by resume token.
+ if (pExpCtx->needsMerge && !pExpCtx->mergeByPBRT) {
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
+ } else {
+ doc.setSortKeyMetaField(resumeToken.toBson());
}
// "invalidate" and "newShardDetected" entries have fewer fields.
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
index 6a86fb8491e..f851a642f18 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
@@ -90,14 +90,23 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() {
if (isInvalidatingCommand(pExpCtx, operationType) && !_ignoreFirstInvalidate) {
auto resumeTokenData = ResumeToken::parse(doc[DSCS::kIdField].getDocument()).getData();
resumeTokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate;
+ auto resumeTokenDoc = ResumeToken(resumeTokenData).toDocument();
- MutableDocument result(Document{{DSCS::kIdField, ResumeToken(resumeTokenData).toDocument()},
+ MutableDocument result(Document{{DSCS::kIdField, resumeTokenDoc},
{DSCS::kOperationTypeField, DSCS::kInvalidateOpType},
{DSCS::kClusterTimeField, doc[DSCS::kClusterTimeField]}});
- // If we're in a sharded environment, we'll need to merge the results by their sort key, so
- // add that as metadata.
+ // We set the resume token as the document's sort key in both the sharded and non-sharded
+ // cases, since we will later rely upon it to generate a correct postBatchResumeToken. We
+ // must therefore update the sort key to match the new resume token that we generated above.
+ // TODO SERVER-38539: when returning results for merging, we check whether 'mergeByPBRT' has
+ // been set. If not, then the request was sent from an older mongoS which cannot merge by
+ // raw resume tokens, and the sort key should therefore be left alone. The 'mergeByPBRT'
+ // flag is no longer necessary in 4.4; all change streams will be merged by resume token.
result.copyMetaDataFrom(doc);
+ if (!pExpCtx->needsMerge || pExpCtx->mergeByPBRT) {
+ result.setSortKeyMetaField(resumeTokenDoc.toBson());
+ }
_queuedInvalidate = result.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp
index 4d5271727f1..78348e8154d 100644
--- a/src/mongo/db/pipeline/document_source_replace_root.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root.cpp
@@ -73,7 +73,9 @@ public:
newRoot.getType() == Object);
// Turn the value into a document.
- return newRoot.getDocument();
+ MutableDocument newDoc(newRoot.getDocument());
+ newDoc.copyMetaDataFrom(input);
+ return newDoc.freeze();
}
// Optimize the newRoot expression.
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 4f14584ae21..3fd1a677fdd 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -54,6 +54,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
comment = request.getComment();
fromMongos = request.isFromMongos();
needsMerge = request.needsMerge();
+ mergeByPBRT = request.mergeByPBRT();
allowDiskUse = request.shouldAllowDiskUse();
bypassDocumentValidation = request.shouldBypassDocumentValidation();
ns = request.getNamespaceString();
@@ -144,6 +145,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->explain = explain;
expCtx->comment = comment;
expCtx->needsMerge = needsMerge;
+ expCtx->mergeByPBRT = mergeByPBRT;
expCtx->fromMongos = fromMongos;
expCtx->inMongos = inMongos;
expCtx->allowDiskUse = allowDiskUse;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 25293100e79..f78a04012f3 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -205,6 +205,7 @@ public:
bool fromMongos = false;
bool needsMerge = false;
+ bool mergeByPBRT = false;
bool inMongos = false;
bool allowDiskUse = false;
bool bypassDocumentValidation = false;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 455a6d35c6e..53cf418e36a 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -275,6 +275,7 @@ BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShard
BSONObj MongoSInterface::createCommandForTargetedShards(
OperationContext* opCtx,
const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
const cluster_aggregation_planner::SplitPipeline& splitPipeline,
const BSONObj collationObj,
const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
@@ -293,6 +294,11 @@ BSONObj MongoSInterface::createCommandForTargetedShards(
if (needsMerge) {
targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+ // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This
+ // notifies the shards that the mongoS is capable of merging streams based on resume token.
+ // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
+ targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream());
+
// For split pipelines which need merging, do *not* propagate the writeConcern to the shards
// part. Otherwise this is part of an exchange and in that case we should include the
// writeConcern.
@@ -364,7 +370,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
+ const LiteParsedPipeline& litePipe,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
BSONObj collationObj) {
// The process is as follows:
@@ -388,7 +394,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe
// If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
// Otherwise, uassert on all exceptions here.
- if (!(liteParsedPipeline.hasChangeStream() &&
+ if (!(litePipe.hasChangeStream() &&
executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
uassertStatusOK(executionNsRoutingInfoStatus);
}
@@ -398,7 +404,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe
: boost::optional<CachedCollectionRoutingInfo>{};
// Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe);
std::set<ShardId> shardIds = getTargetedShards(
opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
@@ -423,7 +429,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe
// Generate the command object for the targeted shards.
BSONObj targetedCommand = splitPipeline
? createCommandForTargetedShards(
- opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
+ opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true)
: createPassthroughCommandForShard(
opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
@@ -466,7 +472,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe
} else {
cursors = establishShardCursors(opCtx,
executionNss,
- liteParsedPipeline,
+ litePipe,
executionNsRoutingInfo,
targetedCommand,
aggRequest,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 6d851abd2e5..1bdcae87595 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -95,6 +95,7 @@ public:
static BSONObj createCommandForTargetedShards(
OperationContext* opCtx,
const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
const cluster_aggregation_planner::SplitPipeline& splitPipeline,
const BSONObj collationObj,
const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
@@ -116,7 +117,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
+ const LiteParsedPipeline& litePipe,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
BSONObj collationObj);