From 16c7973fc8037ee0b7bd12116e5ff25084af142b Mon Sep 17 00:00:00 2001 From: Bernard Gorman Date: Sun, 25 Nov 2018 03:37:30 +0000 Subject: SERVER-38410 Allow ARM to consume postBatchResumeToken --- src/mongo/db/exec/change_stream_proxy.cpp | 6 +- src/mongo/db/pipeline/aggregation_request.cpp | 10 +++ src/mongo/db/pipeline/aggregation_request.h | 22 +++++++ src/mongo/db/pipeline/aggregation_request_test.cpp | 19 ++++-- src/mongo/db/pipeline/change_stream_constants.h | 3 +- .../document_source_change_stream_test.cpp | 75 ++++++++++++++++++++++ .../document_source_change_stream_transform.cpp | 18 ++++-- .../pipeline/document_source_check_invalidate.cpp | 15 ++++- .../db/pipeline/document_source_replace_root.cpp | 4 +- src/mongo/db/pipeline/expression_context.cpp | 2 + src/mongo/db/pipeline/expression_context.h | 1 + src/mongo/db/pipeline/mongos_process_interface.cpp | 16 +++-- src/mongo/db/pipeline/mongos_process_interface.h | 3 +- 13 files changed, 168 insertions(+), 26 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp index 5af30b4b5a3..3fc83586e37 100644 --- a/src/mongo/db/exec/change_stream_proxy.cpp +++ b/src/mongo/db/exec/change_stream_proxy.cpp @@ -51,12 +51,10 @@ ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx, boost::optional ChangeStreamProxyStage::getNextBson() { if (auto next = _pipeline->getNext()) { // While we have more results to return, we track both the timestamp and the resume token of - // the latest event observed in the oplog, the latter via its _id field. + // the latest event observed in the oplog, the latter via its sort key metadata field. auto nextBSON = (_includeMetaData ? next->toBsonWithMetaData() : next->toBson()); _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); - if (next->getField("_id").getType() == BSONType::Object) { - _postBatchResumeToken = next->getField("_id").getDocument().toBson(); - } + _postBatchResumeToken = next->getSortKeyMetaField(); return nextBSON; } diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 9a45b38ec17..e1fb48d7634 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -54,6 +54,7 @@ constexpr StringData AggregationRequest::kCursorName; constexpr StringData AggregationRequest::kBatchSizeName; constexpr StringData AggregationRequest::kFromMongosName; constexpr StringData AggregationRequest::kNeedsMergeName; +constexpr StringData AggregationRequest::kMergeByPBRTName; constexpr StringData AggregationRequest::kPipelineName; constexpr StringData AggregationRequest::kCollationName; constexpr StringData AggregationRequest::kExplainName; @@ -204,6 +205,14 @@ StatusWith AggregationRequest::parseFromBSON( hasNeedsMergeElem = true; request.setNeedsMerge(elem.Bool()); + } else if (kMergeByPBRTName == fieldName) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kMergeByPBRTName << " must be a boolean, not a " + << typeName(elem.type())}; + } + + request.setMergeByPBRT(elem.Bool()); } else if (kAllowDiskUseName == fieldName) { if (storageGlobalParams.readOnly) { return {ErrorCodes::IllegalOperation, @@ -313,6 +322,7 @@ Document AggregationRequest::serializeToCommandObj() const { {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()}, {kFromMongosName, _fromMongos ? Value(true) : Value()}, {kNeedsMergeName, _needsMerge ? Value(true) : Value()}, + {kMergeByPBRTName, _mergeByPBRT ? Value(true) : Value()}, {bypassDocumentValidationCommandOption(), _bypassDocumentValidation ? Value(true) : Value()}, // Only serialize a collation if one was specified. diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 22134763ac9..cac78b57050 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -56,6 +56,7 @@ public: static constexpr StringData kBatchSizeName = "batchSize"_sd; static constexpr StringData kFromMongosName = "fromMongos"_sd; static constexpr StringData kNeedsMergeName = "needsMerge"_sd; + static constexpr StringData kMergeByPBRTName = "mergeByPBRT"_sd; static constexpr StringData kPipelineName = "pipeline"_sd; static constexpr StringData kCollationName = "collation"_sd; static constexpr StringData kExplainName = "explain"_sd; @@ -152,6 +153,22 @@ public: return _needsMerge; } + /** + * Returns true if this request is a change stream pipeline which originated from a mongoS that + * can merge based on the documents' raw resume tokens and the 'postBatchResumeToken' field. If + * not, then the mongoD will need to produce the old {ts, uuid, docKey} $sortKey format instead. + * TODO SERVER-38539: this flag is no longer necessary in 4.4, since all change streams will be + * merged using raw resume tokens and PBRTs. This mechanism was chosen over FCV for two reasons: + * first, because this code is intended for backport to 4.0, where the same issue exists but FCV + * cannot be leveraged. Secondly, FCV can be changed at any point during runtime, but mongoS + * cannot dynamically switch from one $sortKey format to another mid-stream. The 'mergeByPBRT' + * flag allows the mongoS to dictate which $sortKey format will be used, and it will stay + * consistent for the entire duration of the stream. + */ + bool mergeByPBRT() const { + return _mergeByPBRT; + } + bool shouldAllowDiskUse() const { return _allowDiskUse; } @@ -239,6 +256,10 @@ public: _needsMerge = needsMerge; } + void setMergeByPBRT(bool mergeByPBRT) { + _mergeByPBRT = mergeByPBRT; + } + void setBypassDocumentValidation(bool shouldBypassDocumentValidation) { _bypassDocumentValidation = shouldBypassDocumentValidation; } @@ -299,6 +320,7 @@ private: bool _allowDiskUse = false; bool _fromMongos = false; bool _needsMerge = false; + bool _mergeByPBRT = false; bool _bypassDocumentValidation = false; // A user-specified maxTimeMS limit, or a value of '0' if not specified. diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index 502319d12dc..7b6e90008d5 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -59,15 +59,16 @@ TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, " - "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: " - "{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, " - "$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', exchange: {policy: " - "'roundrobin', consumers:NumberInt(2)}}"); + "needsMerge: true, mergeByPBRT: true, bypassDocumentValidation: true, collation: {locale: " + "'en_US'}, cursor: {batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: " + "'linearizable'}, $queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', " + "exchange: {policy: 'roundrobin', consumers:NumberInt(2)}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); ASSERT_TRUE(request.shouldAllowDiskUse()); ASSERT_TRUE(request.isFromMongos()); ASSERT_TRUE(request.needsMerge()); + ASSERT_TRUE(request.mergeByPBRT()); ASSERT_TRUE(request.shouldBypassDocumentValidation()); ASSERT_EQ(request.getBatchSize(), 10); ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1)); @@ -155,6 +156,7 @@ TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefau request.setAllowDiskUse(false); request.setFromMongos(false); request.setNeedsMerge(false); + request.setMergeByPBRT(false); request.setBypassDocumentValidation(false); request.setCollation(BSONObj()); request.setHint(BSONObj()); @@ -176,6 +178,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { request.setAllowDiskUse(true); request.setFromMongos(true); request.setNeedsMerge(true); + request.setMergeByPBRT(true); request.setBypassDocumentValidation(true); request.setBatchSize(10); request.setMaxTimeMS(10u); @@ -199,6 +202,7 @@ TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { {AggregationRequest::kAllowDiskUseName, true}, {AggregationRequest::kFromMongosName, true}, {AggregationRequest::kNeedsMergeName, true}, + {AggregationRequest::kMergeByPBRTName, true}, {bypassDocumentValidationCommandOption(), true}, {AggregationRequest::kCollationName, collationObj}, {AggregationRequest::kCursorName, @@ -377,6 +381,13 @@ TEST(AggregationRequestTest, ShouldRejectFromMongosIfNeedsMerge34AlsoPresent) { ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } +TEST(AggregationRequestTest, ShouldRejectNonBoolMergeByPBRT) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson( + "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, mergeByPBRT: 1, fromMongos: true}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) { NamespaceString nss("a.collection"); const BSONObj inputBson = diff --git a/src/mongo/db/pipeline/change_stream_constants.h b/src/mongo/db/pipeline/change_stream_constants.h index 58488054518..71d1c3f2305 100644 --- a/src/mongo/db/pipeline/change_stream_constants.h +++ b/src/mongo/db/pipeline/change_stream_constants.h @@ -36,8 +36,7 @@ namespace mongo { namespace change_stream_constants { -const BSONObj kSortSpec = - BSON("_id.clusterTime.ts" << 1 << "_id.uuid" << 1 << "_id.documentKey" << 1); +const BSONObj kSortSpec = BSON("_id._data" << 1); } // namespace change_stream_constants } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 418bd7ee8a6..1fffad0e43b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1225,6 +1225,81 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) { ErrorCodes::InvalidResumeToken); } +TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) { + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("x" << 2 << "_id" << 1), // o + testUuid(), // uuid + boost::none, // fromMigrate + boost::none); // o2 + + auto stages = makeStages(insert.toBSON(), kDefaultSpec); + + getExpCtx()->mongoProcessInterface = + std::make_unique(std::vector{{"x"}, {"_id"}}); + + getExpCtx()->mergeByPBRT = false; + getExpCtx()->needsMerge = false; + + auto next = stages.back()->getNext(); + + auto expectedSortKey = + makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)).toBson(); + + ASSERT_TRUE(next.isAdvanced()); + ASSERT_BSONOBJ_EQ(next.releaseDocument().getSortKeyMetaField(), expectedSortKey); +} + +TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfMergeByPBRTIsTrue) { + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("x" << 2 << "_id" << 1), // o + testUuid(), // uuid + boost::none, // fromMigrate + boost::none); // o2 + + auto stages = makeStages(insert.toBSON(), kDefaultSpec); + + getExpCtx()->mongoProcessInterface = + std::make_unique(std::vector{{"x"}, {"_id"}}); + + getExpCtx()->mergeByPBRT = true; + getExpCtx()->needsMerge = true; + + auto next = stages.back()->getNext(); + + auto expectedSortKey = + makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)).toBson(); + + ASSERT_TRUE(next.isAdvanced()); + ASSERT_BSONOBJ_EQ(next.releaseDocument().getSortKeyMetaField(), expectedSortKey); +} + +TEST_F(ChangeStreamStageTest, UsesOldSortKeyFormatIfMergeByPBRTIsFalse) { + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("x" << 2 << "_id" << 1), // o + testUuid(), // uuid + boost::none, // fromMigrate + boost::none); // o2 + + auto stages = makeStages(insert.toBSON(), kDefaultSpec); + + getExpCtx()->mongoProcessInterface = + std::make_unique(std::vector{{"x"}, {"_id"}}); + + getExpCtx()->mergeByPBRT = false; + getExpCtx()->needsMerge = true; + + auto next = stages.back()->getNext(); + + auto expectedSortKey = + BSON("" << kDefaultTs << "" << testUuid() << "" << BSON("x" << 2 << "_id" << 1)); + + ASSERT_TRUE(next.isAdvanced()); + ASSERT_BSONOBJ_EQ(next.releaseDocument().getSortKeyMetaField(), expectedSortKey); +} + // // Test class for change stream of a single database. // diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 9467efad0dd..08c4b5f7772 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -355,7 +355,8 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // Note that 'documentKey' and/or 'uuid' might be missing, in which case they will not appear // in the output. - ResumeTokenData resumeTokenData = getResumeToken(ts, uuid, documentKey); + auto resumeTokenData = getResumeToken(ts, uuid, documentKey); + auto resumeToken = ResumeToken(resumeTokenData).toDocument(); // Add some additional fields only relevant to transactions. if (_txnContext) { @@ -364,15 +365,20 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid)); } - doc.addField(DocumentSourceChangeStream::kIdField, - Value(ResumeToken(resumeTokenData).toDocument())); + doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken)); doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType)); doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime)); - // If we're in a sharded environment, we'll need to merge the results by their sort key, so add - // that as metadata. - if (pExpCtx->needsMerge) { + // We set the resume token as the document's sort key in both the sharded and non-sharded cases, + // since we will subsequently rely upon it to generate a correct postBatchResumeToken. + // TODO SERVER-38539: when returning results for merging, we first check whether 'mergeByPBRT' + // has been set. If not, then the request was sent from an older mongoS which cannot merge by + // raw resume tokens, and we must use the old sort key format. This check, and the 'mergeByPBRT' + // flag, are no longer necessary in 4.4; all change streams will be merged by resume token. + if (pExpCtx->needsMerge && !pExpCtx->mergeByPBRT) { doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); + } else { + doc.setSortKeyMetaField(resumeToken.toBson()); } // "invalidate" and "newShardDetected" entries have fewer fields. diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp index 6a86fb8491e..f851a642f18 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp @@ -90,14 +90,23 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() { if (isInvalidatingCommand(pExpCtx, operationType) && !_ignoreFirstInvalidate) { auto resumeTokenData = ResumeToken::parse(doc[DSCS::kIdField].getDocument()).getData(); resumeTokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate; + auto resumeTokenDoc = ResumeToken(resumeTokenData).toDocument(); - MutableDocument result(Document{{DSCS::kIdField, ResumeToken(resumeTokenData).toDocument()}, + MutableDocument result(Document{{DSCS::kIdField, resumeTokenDoc}, {DSCS::kOperationTypeField, DSCS::kInvalidateOpType}, {DSCS::kClusterTimeField, doc[DSCS::kClusterTimeField]}}); - // If we're in a sharded environment, we'll need to merge the results by their sort key, so - // add that as metadata. + // We set the resume token as the document's sort key in both the sharded and non-sharded + // cases, since we will later rely upon it to generate a correct postBatchResumeToken. We + // must therefore update the sort key to match the new resume token that we generated above. + // TODO SERVER-38539: when returning results for merging, we check whether 'mergeByPBRT' has + // been set. If not, then the request was sent from an older mongoS which cannot merge by + // raw resume tokens, and the sort key should therefore be left alone. The 'mergeByPBRT' + // flag is no longer necessary in 4.4; all change streams will be merged by resume token. result.copyMetaDataFrom(doc); + if (!pExpCtx->needsMerge || pExpCtx->mergeByPBRT) { + result.setSortKeyMetaField(resumeTokenDoc.toBson()); + } _queuedInvalidate = result.freeze(); } diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp index 4d5271727f1..78348e8154d 100644 --- a/src/mongo/db/pipeline/document_source_replace_root.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root.cpp @@ -73,7 +73,9 @@ public: newRoot.getType() == Object); // Turn the value into a document. - return newRoot.getDocument(); + MutableDocument newDoc(newRoot.getDocument()); + newDoc.copyMetaDataFrom(input); + return newDoc.freeze(); } // Optimize the newRoot expression. diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 4f14584ae21..3fd1a677fdd 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -54,6 +54,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, comment = request.getComment(); fromMongos = request.isFromMongos(); needsMerge = request.needsMerge(); + mergeByPBRT = request.mergeByPBRT(); allowDiskUse = request.shouldAllowDiskUse(); bypassDocumentValidation = request.shouldBypassDocumentValidation(); ns = request.getNamespaceString(); @@ -144,6 +145,7 @@ intrusive_ptr ExpressionContext::copyWith( expCtx->explain = explain; expCtx->comment = comment; expCtx->needsMerge = needsMerge; + expCtx->mergeByPBRT = mergeByPBRT; expCtx->fromMongos = fromMongos; expCtx->inMongos = inMongos; expCtx->allowDiskUse = allowDiskUse; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 25293100e79..f78a04012f3 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -205,6 +205,7 @@ public: bool fromMongos = false; bool needsMerge = false; + bool mergeByPBRT = false; bool inMongos = false; bool allowDiskUse = false; bool bypassDocumentValidation = false; diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 455a6d35c6e..53cf418e36a 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -275,6 +275,7 @@ BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShard BSONObj MongoSInterface::createCommandForTargetedShards( OperationContext* opCtx, const AggregationRequest& request, + const LiteParsedPipeline& litePipe, const cluster_aggregation_planner::SplitPipeline& splitPipeline, const BSONObj collationObj, const boost::optional exchangeSpec, @@ -293,6 +294,11 @@ BSONObj MongoSInterface::createCommandForTargetedShards( if (needsMerge) { targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + // If this is a change stream aggregation, set the 'mergeByPBRT' flag on the command. This + // notifies the shards that the mongoS is capable of merging streams based on resume token. + // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. + targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream()); + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards // part. Otherwise this is part of an exchange and in that case we should include the // writeConcern. @@ -364,7 +370,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe const boost::intrusive_ptr& expCtx, const NamespaceString& executionNss, const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, + const LiteParsedPipeline& litePipe, std::unique_ptr pipeline, BSONObj collationObj) { // The process is as follows: @@ -388,7 +394,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. // Otherwise, uassert on all exceptions here. - if (!(liteParsedPipeline.hasChangeStream() && + if (!(litePipe.hasChangeStream() && executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { uassertStatusOK(executionNsRoutingInfoStatus); } @@ -398,7 +404,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe : boost::optional{}; // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); + const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe); std::set shardIds = getTargetedShards( opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); @@ -423,7 +429,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe // Generate the command object for the targeted shards. BSONObj targetedCommand = splitPipeline ? createCommandForTargetedShards( - opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) + opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true) : createPassthroughCommandForShard( opCtx, aggRequest, boost::none, pipeline.get(), collationObj); @@ -466,7 +472,7 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe } else { cursors = establishShardCursors(opCtx, executionNss, - liteParsedPipeline, + litePipe, executionNsRoutingInfo, targetedCommand, aggRequest, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 6d851abd2e5..1bdcae87595 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -95,6 +95,7 @@ public: static BSONObj createCommandForTargetedShards( OperationContext* opCtx, const AggregationRequest& request, + const LiteParsedPipeline& litePipe, const cluster_aggregation_planner::SplitPipeline& splitPipeline, const BSONObj collationObj, const boost::optional exchangeSpec, @@ -116,7 +117,7 @@ public: const boost::intrusive_ptr& expCtx, const NamespaceString& executionNss, const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, + const LiteParsedPipeline& litePipe, std::unique_ptr pipeline, BSONObj collationObj); -- cgit v1.2.1