diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2021-06-17 21:05:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-23 23:12:44 +0000 |
commit | 1c65954179826c76a0922e35bd4429600a1448f6 (patch) | |
tree | ec2c8bdb7fca607a20b883dda7db558e381b6eae | |
parent | 8bd1e746079a416eac1611fae01b8b3efa4b92e0 (diff) | |
download | mongo-1c65954179826c76a0922e35bd4429600a1448f6.tar.gz |
SERVER-56871: Allow $match and $project to get pushed down to the shards for the change streams.
21 files changed, 747 insertions, 54 deletions
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js index 4abc2f06ffb..531690afd19 100644 --- a/jstests/change_streams/collation.js +++ b/jstests/change_streams/collation.js @@ -240,9 +240,9 @@ assertDropCollection(db, collName); // Test that a $changeStream is allowed to resume on the dropped collection with an explicit // collation, even if it doesn't match the original collection's default collation. -changeStream = - caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}], - {resumeAfter: resumeToken, collation: {locale: "simple"}}); +changeStream = caseInsensitiveCollection.watch( + [{$match: {$or: [{"_id": resumeToken}, {"fullDocument.text": "ABC"}]}}], + {resumeAfter: resumeToken, collation: {locale: "simple"}}); assert.soon(() => changeStream.hasNext()); assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); @@ -304,9 +304,9 @@ if (!isChangeStreamPassthrough()) { // Test that a pipeline with an explicit collation is allowed to resume from before the // collection is dropped and recreated. -changeStream = - caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}], - {resumeAfter: resumeToken, collation: {locale: "fr"}}); +changeStream = caseInsensitiveCollection.watch( + [{$match: {$or: [{"_id": resumeToken}, {"fullDocument.text": "ABC"}]}}], + {resumeAfter: resumeToken, collation: {locale: "fr"}}); assert.soon(() => changeStream.hasNext()); assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index edf55161c6a..de7cf737280 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -131,8 +131,9 @@ if (!isMongos) { } // Only watch the "update" changes of the specific doc since the beginning. - changeStreamCursor = coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}], - {resumeAfter: resumeToken, batchSize: 2}); + changeStreamCursor = coll.watch( + [{$match: {$or: [{_id: resumeToken}, {documentKey: {_id: 1}, operationType: "update"}]}}], + {resumeAfter: resumeToken, batchSize: 2}); // Check the first batch. assert.eq(changeStreamCursor.objsLeftInBatch(), 2); diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js index 8d0fc941020..078b85261a7 100644 --- a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js +++ b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js @@ -80,11 +80,12 @@ assert.throwsWithCode(function() { // don't trip the validation checks for the existence of the pre-image. for (let runOnDB of [testDB, adminDB]) { // Open a whole-db or whole-cluster stream that filters for the 'collWithPreImages' namespace. - const csCursor = runOnDB.watch([{$match: {"ns.coll": collWithPreImages.getName()}}], { - fullDocumentBeforeChange: "required", - resumeAfter: resumeToken, - allChangesForCluster: (runOnDB === adminDB) - }); + const csCursor = runOnDB.watch( + [{$match: {$or: [{_id: resumeToken}, {"ns.coll": collWithPreImages.getName()}]}}], { + fullDocumentBeforeChange: "required", + resumeAfter: resumeToken, + allChangesForCluster: (runOnDB === adminDB) + }); // The list of events and pre-images that we expect to see in the stream. const expectedPreImageEvents = [ diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 0fff6070ba3..dbbf223f0ba 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -138,7 +138,7 @@ Value DocumentSourceChangeStreamTransform::serializeLegacy( _changeStreamSpec.getResumeAfter() || _changeStreamSpec.getStartAtOperationTime() || _changeStreamSpec.getStartAfter()); - return Value(Document{{getSourceName(), _changeStreamSpec.toBSON()}}); + return Value(Document{{DocumentSourceChangeStream::kStageName, _changeStreamSpec.toBSON()}}); } Value DocumentSourceChangeStreamCheckInvalidate::serializeLegacy( diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 997f8c1f06a..3c183d9e847 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -34,9 +34,12 @@ #include "mongo/db/commands/feature_compatibility_version_documentation.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/matcher/expression_algo.h" +#include "mongo/db/pipeline/document_source_add_fields.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_internal_shard_filter.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/expression_context.h" @@ -239,13 +242,25 @@ bool DocumentSource::pushSampleBefore(Pipeline::SourceContainer::iterator itr, return false; } +bool DocumentSource::pushSingleDocumentTransformBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + auto singleDocTransform = + dynamic_cast<DocumentSourceSingleDocumentTransformation*>((*std::next(itr)).get()); + + if (constraints().canSwapWithSingleDocTransform && singleDocTransform) { + container->insert(itr, std::move(singleDocTransform)); + container->erase(std::next(itr)); + return true; + } + return false; +} + Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); - // Attempt to swap 'itr' with a subsequent $match or subsequent $sample. - if (std::next(itr) != container->end() && - (pushMatchBefore(itr, container) || pushSampleBefore(itr, container))) { + // Attempt to swap 'itr' with a subsequent stage, if applicable. + if (attemptToPushStageBefore(itr, container)) { // The stage before the pushed before stage may be able to optimize further, if there is // such a stage. return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 9bc9f1cab2c..49cc5cddcfd 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -442,6 +442,31 @@ private: bool pushSampleBefore(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container); + /** + * Attempts to push any kind of 'DocumentSourceSingleDocumentTransformation' stage directly + * ahead of the stage present at the 'itr' position if matches the constraints. Returns true if + * optimization was performed, false otherwise. + * + * Note that this optimization is oblivious to the transform function. The only stages that are + * eligible to swap are those that can safely swap with any transform. + */ + bool pushSingleDocumentTransformBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container); + + /** + * Wraps various optimization methods and returns the call immediately if any one of them + * returns true. + */ + bool attemptToPushStageBefore(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + if (std::next(itr) == container->end()) { + return false; + } + + return pushMatchBefore(itr, container) || pushSampleBefore(itr, container) || + pushSingleDocumentTransformBefore(itr, container); + } + public: /** * The non-virtual public interface for optimization. Attempts to do some generic optimizations diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp index 7ea37e67da8..4ba82351811 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp @@ -31,6 +31,8 @@ #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" +#include "mongo/db/query/query_feature_flags_gen.h" + namespace mongo { DocumentSourceChangeStreamEnsureResumeTokenPresent:: @@ -53,6 +55,34 @@ const char* DocumentSourceChangeStreamEnsureResumeTokenPresent::getSourceName() return kStageName.rawData(); } + +StageConstraints DocumentSourceChangeStreamEnsureResumeTokenPresent::constraints( + Pipeline::SplitState) const { + StageConstraints constraints{StreamType::kStreaming, + PositionRequirement::kNone, + // If this is parsed on mongos it should stay on mongos. If we're + // not in a sharded cluster then it's okay to run on mongod. + HostTypeRequirement::kLocalOnly, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + + // The '$match' and 'DocumentSourceSingleDocumentTransformation' stages can swap with this + // stage, allowing filtering and reshaping to occur earlier in the pipeline. For sharded cluster + // pipelines, swaps can allow $match and 'DocumentSourceSingleDocumentTransformation' stages to + // execute on the shards, providing inter-node parallelism and potentially reducing the amount + // of data sent form each shard to the mongoS. + if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + constraints.canSwapWithMatch = true; + constraints.canSwapWithSingleDocTransform = true; + } + + return constraints; +} + DocumentSource::GetNextResult DocumentSourceChangeStreamEnsureResumeTokenPresent::doGetNext() { // If we have already verified the resume token is present, return the next doc immediately. if (_resumeStatus == ResumeStatus::kSurpassedToken) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h index 57b91107913..ae484dbff51 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h @@ -39,22 +39,15 @@ namespace mongo { class DocumentSourceChangeStreamEnsureResumeTokenPresent final : public DocumentSourceChangeStreamCheckResumability { public: - static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd; + static constexpr StringData kStageName = "$_internalChangeStreamEnsureResumeTokenPresent"_sd; const char* getSourceName() const final; - StageConstraints constraints(Pipeline::SplitState) const final { - return {StreamType::kStreaming, - PositionRequirement::kNone, - // If this is parsed on mongos it should stay on mongos. If we're not in a sharded - // cluster then it's okay to run on mongod. - HostTypeRequirement::kLocalOnly, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed, - LookupRequirement::kNotAllowed, - UnionRequirement::kNotAllowed, - ChangeStreamRequirement::kChangeStreamStage}; + StageConstraints constraints(Pipeline::SplitState) const final; + + GetModPathsReturn getModifiedPaths() const final { + // This stage neither modifies nor renames any field. + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } static boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> create( diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 962e48fe4b8..d4c1d917799 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -55,8 +55,11 @@ bool isShardConfigEvent(const Document& eventDoc) { // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event // here. We may wish to remove this mechanism entirely in 4.7+, or retain it for future cases // where a change stream is targeted to a subset of shards. See SERVER-44039 for details. - if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() == - DocumentSourceChangeStream::kNewShardDetectedOpType) { + + auto opType = eventDoc[DocumentSourceChangeStream::kOperationTypeField]; + + if (!opType.missing() && + opType.getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) { // If the failpoint is enabled, throw the 'ChangeStreamToplogyChange' exception to the // client. This is used in testing to confirm that the swallowed 'kNewShardDetected' event // has reached the mongoS. @@ -87,6 +90,29 @@ DocumentSourceChangeStreamHandleTopologyChange::DocumentSourceChangeStreamHandle const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(kStageName, expCtx) {} +StageConstraints DocumentSourceChangeStreamHandleTopologyChange::constraints( + Pipeline::SplitState) const { + StageConstraints constraints{StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kMongoS, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage}; + + // Can be swapped with the '$match' and 'DocumentSourceSingleDocumentTransformation' stages and + // ensures that they get pushed down to the shards, as this stage bisects the change streams + // pipeline. + if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + constraints.canSwapWithMatch = true; + constraints.canSwapWithSingleDocTransform = true; + } + + return constraints; +} + DocumentSource::GetNextResult DocumentSourceChangeStreamHandleTopologyChange::doGetNext() { // For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be // populated. We also resolve the original aggregation command from the expression context. diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h index 1d240be5e3c..339d0db2f4d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h @@ -56,20 +56,19 @@ public: static boost::intrusive_ptr<DocumentSourceChangeStreamHandleTopologyChange> create( const boost::intrusive_ptr<ExpressionContext>&); + const char* getSourceName() const final { + return kStageName.rawData(); + } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { return (explain ? Value(Document{{kStageName, Value()}}) : Value()); } - virtual StageConstraints constraints(Pipeline::SplitState) const { - return {StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kMongoS, - DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed, - TransactionRequirement::kNotAllowed, - LookupRequirement::kNotAllowed, - UnionRequirement::kNotAllowed, - ChangeStreamRequirement::kChangeStreamStage}; + StageConstraints constraints(Pipeline::SplitState) const final; + + GetModPathsReturn getModifiedPaths() const final { + // This stage neither modifies nor renames any field. + return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index 5dd28772c6c..a37869bd035 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -212,7 +212,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceChangeStreamOplogMatch::creat const char* DocumentSourceChangeStreamOplogMatch::getSourceName() const { // This is used in error reporting, particularly if we find this stage in a position other // than first, so report the name as $changeStream. - return DocumentSourceChangeStream::kStageName.rawData(); + return kStageName.rawData(); } StageConstraints DocumentSourceChangeStreamOplogMatch::constraints( diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index a340096f5e7..7452d9f7d98 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -267,7 +267,7 @@ public: // Check the oplog entry is transformed correctly. auto transform = stages[2].get(); ASSERT(transform); - ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName); + ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform)); // Create mock stage and insert at the front of the stages. auto mock = DocumentSourceMock::createForTest(D(entry), getExpCtx()); @@ -466,6 +466,44 @@ public: } }; +class ChangeStreamPipelineOptimizationTest : public ChangeStreamStageTest { +public: + explicit ChangeStreamPipelineOptimizationTest() : ChangeStreamStageTest() {} + + void run() { + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsOptimization", + true); + ASSERT(getCSOptimizationFeatureFlagValue()); + ChangeStreamStageTest::run(); + } + + std::unique_ptr<Pipeline, PipelineDeleter> buildTestPipeline( + const std::vector<BSONObj>& rawPipeline) { + auto expCtx = getExpCtx(); + expCtx->ns = NamespaceString("a.collection"); + expCtx->inMongos = true; + + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + pipeline->optimizePipeline(); + + return pipeline; + } + + void assertStagesNameOrder(std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + const std::vector<std::string> expectedStages) { + ASSERT_EQ(pipeline->getSources().size(), expectedStages.size()); + + auto stagesItr = pipeline->getSources().begin(); + auto expectedStagesItr = expectedStages.begin(); + + while (expectedStagesItr != expectedStages.end()) { + ASSERT_EQ(*expectedStagesItr, stagesItr->get()->getSourceName()); + ++expectedStagesItr; + ++stagesItr; + } + } +}; + TEST_F(ChangeStreamStageTest, ShouldRejectNonObjectArg) { auto expCtx = getExpCtx(); @@ -2166,7 +2204,6 @@ template <typename Stage, typename StageSpec> void validateDocumentSourceStageSerialization( StageSpec spec, BSONObj specAsBSON, const boost::intrusive_ptr<ExpressionContext>& expCtx) { auto stage = Stage::createFromBson(specAsBSON.firstElement(), expCtx); - vector<Value> serialization; stage->serializeToArray(serialization); if (getCSOptimizationFeatureFlagValue()) { @@ -3304,5 +3341,513 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai BSON("$changeStream" << BSON("startAfter" << resumeToken))); } +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleMatch) { + // + // Tests that the single '$match' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = { + fromjson("{$changeStream: {}}"), + fromjson("{$match: {operationType: 'insert'}}"), + }; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatch) { + // + // Tests that multiple '$match' gets merged and promoted before the + // '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$match: {operationType: 'insert'}}"), + fromjson("{$match: {operationType: 'delete'}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatchAndResumeToken) { + // + // Tests that multiple '$match' gets merged and promoted before the + // '$_internalUpdateOnAddShard' if resume token if present. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$match" << BSON("operationType" + << "insert")), + BSON("$match" << BSON("operationType" + << "insert"))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleProject) { + // + // Tests that the single'$project' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$project: {operationType: 1}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$project", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProject) { + // + // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$project: {operationType: 1}}"), + fromjson("{$project: {fullDocument: 1}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$project", + "$project", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProjectAndResumeToken) { + // + // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard' if + // resume token is present. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$project" << BSON("operationType" << 1)), + BSON("$project" << BSON("fullDocument" << 1))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$project", + "$project", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithProjectMatchAndResumeToken) { + // + // Tests that a '$project' followed by a '$match' gets optimized and they get promoted before + // the '$_internalUpdateOnAddShard'. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$project" << BSON("operationType" << 1)), + BSON("$match" << BSON("operationType" + << "insert"))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$project", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleUnset) { + // + // Tests that the single'$unset' gets promoted before the '$_internalUpdateOnAddShard' as + // '$project'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$unset: 'operationType'}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$project", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleUnset) { + // + // Tests that multiple '$unset' gets promoted before the '$_internalUpdateOnAddShard' as + // '$project'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$unset: 'operationType'}"), + fromjson("{$unset: 'fullDocument'}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$project", + "$project", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithUnsetAndResumeToken) { + // + // Tests that the '$unset' gets promoted before the '$_internalUpdateOnAddShard' as '$project' + // even if resume token is present. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = {BSON("$changeStream" + << BSON("resumeAfter" << resumeToken)), + BSON("$unset" + << "operationType")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$project", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleAddFields) { + // + // Tests that the single'$addFields' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$addFields: {stockPrice: 100}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$addFields", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleAddFields) { + // + // Tests that multiple '$addFields' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$addFields: {stockPrice: 100}}"), + fromjson("{$addFields: {quarter: 'Q1'}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$addFields", + "$addFields", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAddFieldsAndResumeToken) { + // + // Tests that the '$addFields' gets promoted before the '$_internalUpdateOnAddShard' if + // resume token is present. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$addFields" << BSON("stockPrice" << 100))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$addFields", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleSet) { + // + // Tests that the single'$set' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$set: {stockPrice: 100}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$set", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleSet) { + // + // Tests that multiple '$set' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$set: {stockPrice: 100}}"), + fromjson("{$set: {quarter: 'Q1'}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$set", + "$set", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSetAndResumeToken) { + // + // Tests that the '$set' gets promoted before the '$_internalUpdateOnAddShard' if + // resume token is present. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$set" << BSON("stockPrice" << 100))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$set", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceRoot) { + // + // Tests that the single'$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard'. + // + const std::vector<BSONObj> rawPipeline = { + fromjson("{$changeStream: {}}"), fromjson("{$replaceRoot: {newRoot: '$fullDocument'}}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$replaceRoot", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceRootAndResumeToken) { + // + // Tests that the '$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard' if + // resume token is present. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" << BSON("resumeAfter" << resumeToken)), + BSON("$replaceRoot" << BSON("newRoot" + << "$fullDocument"))}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$replaceRoot", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceWith) { + // + // Tests that the single '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' as + // '$replaceRoot'. + // + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"), + fromjson("{$replaceWith: '$fullDocument'}")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$replaceRoot", + "$_internalChangeStreamHandleTopologyChange"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceWithAndResumeToken) { + // + // Tests that the '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' if + // resume token is present as '$replaceRoot'. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = {BSON("$changeStream" + << BSON("resumeAfter" << resumeToken)), + BSON("$replaceWith" + << "$fullDocument")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$replaceRoot", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + +TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAllStagesAndResumeToken) { + // + // Tests that when all allowed stages are included along with the resume token, the final + // pipeline gets optimized. + // + auto resumeToken = makeResumeToken(kDefaultTs, testUuid()); + + const std::vector<BSONObj> rawPipeline = {BSON("$changeStream" + << BSON("resumeAfter" << resumeToken)), + BSON("$project" << BSON("operationType" << 1)), + BSON("$unset" + << "_id"), + BSON("$addFields" << BSON("stockPrice" << 100)), + BSON("$set" + << BSON("fullDocument.stockPrice" << 100)), + BSON("$match" << BSON("operationType" + << "insert")), + BSON("$replaceRoot" << BSON("newRoot" + << "$fullDocument")), + BSON("$replaceWith" + << "fullDocument.stockPrice")}; + + auto pipeline = buildTestPipeline(rawPipeline); + + assertStagesNameOrder(std::move(pipeline), + {"$_internalChangeStreamOplogMatch", + "$_internalChangeStreamUnwindTransaction", + "$_internalChangeStreamTransform", + "$_internalChangeStreamCheckInvalidate", + "$_internalChangeStreamCheckResumability", + "$_internalChangeStreamCheckTopologyChange", + "$match", + "$project", + "$project", + "$addFields", + "$set", + "$replaceRoot", + "$replaceRoot", + "$_internalChangeStreamHandleTopologyChange", + "$_internalChangeStreamEnsureResumeTokenPresent"}); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 229b9229565..c8eacdda7df 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -390,7 +390,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document Value DocumentSourceChangeStreamTransform::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { - return Value(Document{{getSourceName(), + return Value(Document{{DocumentSourceChangeStream::kStageName, Document{{"stage"_sd, "internalTransform"_sd}, {"options"_sd, _changeStreamSpec.toBSON()}}}}); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 46fe08b7a49..f4782ca89cf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -64,8 +64,8 @@ public: return boost::none; } - const char* getSourceName() const { - return DocumentSourceChangeStream::kStageName.rawData(); + const char* getSourceName() const final { + return kStageName.rawData(); } protected: diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp index 04aa426bd1e..899dbe4de42 100644 --- a/src/mongo/db/pipeline/document_source_limit_test.cpp +++ b/src/mongo/db/pipeline/document_source_limit_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/unittest/unittest.h" @@ -90,6 +91,23 @@ TEST_F(DocumentSourceLimitTest, TwoLimitStagesShouldCombineIntoOne) { ASSERT_EQUALS(1U, container.size()); } +TEST_F(DocumentSourceLimitTest, DoesNotPushProjectBeforeSelf) { + Pipeline::SourceContainer container; + auto limit = DocumentSourceLimit::create(getExpCtx(), 10); + auto project = + DocumentSourceProject::create(BSON("fullDocument" << true), getExpCtx(), "$project"_sd); + + container.push_back(limit); + container.push_back(project); + + limit->optimizeAt(container.begin(), &container); + + ASSERT_EQUALS(2U, container.size()); + ASSERT(dynamic_cast<DocumentSourceLimit*>(container.begin()->get())); + ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>( + std::next(container.begin())->get())); +} + TEST_F(DocumentSourceLimitTest, DisposeShouldCascadeAllTheWayToSource) { auto source = DocumentSourceMock::createForTest({"{a: 1}", "{a: 1}"}, getExpCtx()); diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp index 703512557d6..6ab275e0417 100644 --- a/src/mongo/db/pipeline/document_source_match_test.cpp +++ b/src/mongo/db/pipeline/document_source_match_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/logv2/log.h" #include "mongo/unittest/death_test.h" @@ -492,6 +493,23 @@ TEST_F(DocumentSourceMatchTest, MultipleMatchStagesShouldCombineIntoOne) { "{c:1}]}")); } +TEST_F(DocumentSourceMatchTest, DoesNotPushProjectBeforeSelf) { + Pipeline::SourceContainer container; + auto match = DocumentSourceMatch::create(BSON("_id" << 1), getExpCtx()); + auto project = + DocumentSourceProject::create(BSON("fullDocument" << true), getExpCtx(), "$project"_sd); + + container.push_back(match); + container.push_back(project); + + match->optimizeAt(container.begin(), &container); + + ASSERT_EQUALS(2U, container.size()); + ASSERT(dynamic_cast<DocumentSourceMatch*>(container.begin()->get())); + ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>( + std::next(container.begin())->get())); +} + TEST_F(DocumentSourceMatchTest, ShouldPropagatePauses) { auto match = DocumentSourceMatch::create(BSON("a" << 1), getExpCtx()); auto mock = diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 980fc537f97..9227cc35f71 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/unittest/temp_dir.h" @@ -163,6 +164,23 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->distributedPlanLogic()->mergingStage.get())); } +TEST_F(DocumentSourceSortTest, DoesNotPushProjectBeforeSelf) { + Pipeline::SourceContainer container; + createSort(BSON("_id" << 1)); + auto project = + DocumentSourceProject::create(BSON("fullDocument" << true), getExpCtx(), "$project"_sd); + + container.push_back(sort()); + container.push_back(project); + + sort()->optimizeAt(container.begin(), &container); + + ASSERT_EQUALS(2U, container.size()); + ASSERT(dynamic_cast<DocumentSourceSort*>(container.begin()->get())); + ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>( + std::next(container.begin())->get())); +} + TEST_F(DocumentSourceSortTest, Dependencies) { createSort(BSON("a" << 1 << "b.c" << -1)); DepsTracker dependencies; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 37586a16abd..fb81a3ed888 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -76,7 +76,7 @@ using std::vector; const NamespaceString kTestNss = NamespaceString("a.collection"); size_t getChangeStreamStageSize() { - return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 5 : 6); + return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 6 : 6); } void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) { diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index e491d99afdc..3f167f28530 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -176,11 +176,6 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional } void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& event) const { - // If we are producing output to be merged on mongoS, then no stages can have modified the _id. - if (_expCtx->needsMerge) { - return; - } - // Confirm that the document _id field matches the original resume token in the sort key field. auto eventBSON = event.toBson(); auto resumeToken = event.metadata().getSortKey(); diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h index 661b9954195..b520dfdbbdb 100644 --- a/src/mongo/db/pipeline/stage_constraints.h +++ b/src/mongo/db/pipeline/stage_constraints.h @@ -328,6 +328,10 @@ struct StageConstraints { // documents because our implementation of $sample shuffles the order bool canSwapWithSkippingOrLimitingStage = false; + // If true, then any stage of kind 'DocumentSourceSingleDocumentTransformation' can be swapped + // ahead of this stage. + bool canSwapWithSingleDocTransform = false; + // Indicates that a stage is allowed within a pipeline-stlye update. bool isAllowedWithinUpdatePipeline = false; diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index d6eca0d9b1b..236729e2359 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -489,6 +489,11 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { Status AsyncResultsMerger::scheduleGetMores() { stdx::lock_guard<Latch> lk(_mutex); + + if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + _assertNotInvalidated(lk); + } + return _scheduleGetMores(lk); } |