summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/collation.js12
-rw-r--r--jstests/change_streams/shell_helper.js5
-rw-r--r--jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js11
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source.h25
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp30
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp30
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp549
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h4
-rw-r--r--src/mongo/db/pipeline/document_source_limit_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_match_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp18
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp2
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp5
-rw-r--r--src/mongo/db/pipeline/stage_constraints.h4
-rw-r--r--src/mongo/s/query/async_results_merger.cpp5
21 files changed, 747 insertions, 54 deletions
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js
index 4abc2f06ffb..531690afd19 100644
--- a/jstests/change_streams/collation.js
+++ b/jstests/change_streams/collation.js
@@ -240,9 +240,9 @@ assertDropCollection(db, collName);
// Test that a $changeStream is allowed to resume on the dropped collection with an explicit
// collation, even if it doesn't match the original collection's default collation.
-changeStream =
- caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}],
- {resumeAfter: resumeToken, collation: {locale: "simple"}});
+changeStream = caseInsensitiveCollection.watch(
+ [{$match: {$or: [{"_id": resumeToken}, {"fullDocument.text": "ABC"}]}}],
+ {resumeAfter: resumeToken, collation: {locale: "simple"}});
assert.soon(() => changeStream.hasNext());
assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
@@ -304,9 +304,9 @@ if (!isChangeStreamPassthrough()) {
// Test that a pipeline with an explicit collation is allowed to resume from before the
// collection is dropped and recreated.
-changeStream =
- caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}],
- {resumeAfter: resumeToken, collation: {locale: "fr"}});
+changeStream = caseInsensitiveCollection.watch(
+ [{$match: {$or: [{"_id": resumeToken}, {"fullDocument.text": "ABC"}]}}],
+ {resumeAfter: resumeToken, collation: {locale: "fr"}});
assert.soon(() => changeStream.hasNext());
assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index edf55161c6a..de7cf737280 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -131,8 +131,9 @@ if (!isMongos) {
}
// Only watch the "update" changes of the specific doc since the beginning.
- changeStreamCursor = coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}],
- {resumeAfter: resumeToken, batchSize: 2});
+ changeStreamCursor = coll.watch(
+ [{$match: {$or: [{_id: resumeToken}, {documentKey: {_id: 1}, operationType: "update"}]}}],
+ {resumeAfter: resumeToken, batchSize: 2});
// Check the first batch.
assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
index 8d0fc941020..078b85261a7 100644
--- a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
+++ b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
@@ -80,11 +80,12 @@ assert.throwsWithCode(function() {
// don't trip the validation checks for the existence of the pre-image.
for (let runOnDB of [testDB, adminDB]) {
// Open a whole-db or whole-cluster stream that filters for the 'collWithPreImages' namespace.
- const csCursor = runOnDB.watch([{$match: {"ns.coll": collWithPreImages.getName()}}], {
- fullDocumentBeforeChange: "required",
- resumeAfter: resumeToken,
- allChangesForCluster: (runOnDB === adminDB)
- });
+ const csCursor = runOnDB.watch(
+ [{$match: {$or: [{_id: resumeToken}, {"ns.coll": collWithPreImages.getName()}]}}], {
+ fullDocumentBeforeChange: "required",
+ resumeAfter: resumeToken,
+ allChangesForCluster: (runOnDB === adminDB)
+ });
// The list of events and pre-images that we expect to see in the stream.
const expectedPreImageEvents = [
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 0fff6070ba3..dbbf223f0ba 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -138,7 +138,7 @@ Value DocumentSourceChangeStreamTransform::serializeLegacy(
_changeStreamSpec.getResumeAfter() || _changeStreamSpec.getStartAtOperationTime() ||
_changeStreamSpec.getStartAfter());
- return Value(Document{{getSourceName(), _changeStreamSpec.toBSON()}});
+ return Value(Document{{DocumentSourceChangeStream::kStageName, _changeStreamSpec.toBSON()}});
}
Value DocumentSourceChangeStreamCheckInvalidate::serializeLegacy(
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 997f8c1f06a..3c183d9e847 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -34,9 +34,12 @@
#include "mongo/db/commands/feature_compatibility_version_documentation.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/matcher/expression_algo.h"
+#include "mongo/db/pipeline/document_source_add_fields.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_internal_shard_filter.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sequential_document_cache.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -239,13 +242,25 @@ bool DocumentSource::pushSampleBefore(Pipeline::SourceContainer::iterator itr,
return false;
}
+bool DocumentSource::pushSingleDocumentTransformBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ auto singleDocTransform =
+ dynamic_cast<DocumentSourceSingleDocumentTransformation*>((*std::next(itr)).get());
+
+ if (constraints().canSwapWithSingleDocTransform && singleDocTransform) {
+ container->insert(itr, std::move(singleDocTransform));
+ container->erase(std::next(itr));
+ return true;
+ }
+ return false;
+}
+
Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
- // Attempt to swap 'itr' with a subsequent $match or subsequent $sample.
- if (std::next(itr) != container->end() &&
- (pushMatchBefore(itr, container) || pushSampleBefore(itr, container))) {
+ // Attempt to swap 'itr' with a subsequent stage, if applicable.
+ if (attemptToPushStageBefore(itr, container)) {
// The stage before the pushed before stage may be able to optimize further, if there is
// such a stage.
return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr));
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 9bc9f1cab2c..49cc5cddcfd 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -442,6 +442,31 @@ private:
bool pushSampleBefore(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container);
+ /**
+ * Attempts to push any kind of 'DocumentSourceSingleDocumentTransformation' stage directly
+ * ahead of the stage present at the 'itr' position if matches the constraints. Returns true if
+ * optimization was performed, false otherwise.
+ *
+ * Note that this optimization is oblivious to the transform function. The only stages that are
+ * eligible to swap are those that can safely swap with any transform.
+ */
+ bool pushSingleDocumentTransformBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container);
+
+ /**
+ * Wraps various optimization methods and returns the call immediately if any one of them
+ * returns true.
+ */
+ bool attemptToPushStageBefore(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ if (std::next(itr) == container->end()) {
+ return false;
+ }
+
+ return pushMatchBefore(itr, container) || pushSampleBefore(itr, container) ||
+ pushSingleDocumentTransformBefore(itr, container);
+ }
+
public:
/**
* The non-virtual public interface for optimization. Attempts to do some generic optimizations
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
index 7ea37e67da8..4ba82351811 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
@@ -31,6 +31,8 @@
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
+
namespace mongo {
DocumentSourceChangeStreamEnsureResumeTokenPresent::
@@ -53,6 +55,34 @@ const char* DocumentSourceChangeStreamEnsureResumeTokenPresent::getSourceName()
return kStageName.rawData();
}
+
+StageConstraints DocumentSourceChangeStreamEnsureResumeTokenPresent::constraints(
+ Pipeline::SplitState) const {
+ StageConstraints constraints{StreamType::kStreaming,
+ PositionRequirement::kNone,
+ // If this is parsed on mongos it should stay on mongos. If we're
+ // not in a sharded cluster then it's okay to run on mongod.
+ HostTypeRequirement::kLocalOnly,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed,
+ UnionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+
+ // The '$match' and 'DocumentSourceSingleDocumentTransformation' stages can swap with this
+ // stage, allowing filtering and reshaping to occur earlier in the pipeline. For sharded cluster
+ // pipelines, swaps can allow $match and 'DocumentSourceSingleDocumentTransformation' stages to
+ // execute on the shards, providing inter-node parallelism and potentially reducing the amount
+ // of data sent form each shard to the mongoS.
+ if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ constraints.canSwapWithMatch = true;
+ constraints.canSwapWithSingleDocTransform = true;
+ }
+
+ return constraints;
+}
+
DocumentSource::GetNextResult DocumentSourceChangeStreamEnsureResumeTokenPresent::doGetNext() {
// If we have already verified the resume token is present, return the next doc immediately.
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
index 57b91107913..ae484dbff51 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
@@ -39,22 +39,15 @@ namespace mongo {
class DocumentSourceChangeStreamEnsureResumeTokenPresent final
: public DocumentSourceChangeStreamCheckResumability {
public:
- static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd;
+ static constexpr StringData kStageName = "$_internalChangeStreamEnsureResumeTokenPresent"_sd;
const char* getSourceName() const final;
- StageConstraints constraints(Pipeline::SplitState) const final {
- return {StreamType::kStreaming,
- PositionRequirement::kNone,
- // If this is parsed on mongos it should stay on mongos. If we're not in a sharded
- // cluster then it's okay to run on mongod.
- HostTypeRequirement::kLocalOnly,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kNotAllowed,
- LookupRequirement::kNotAllowed,
- UnionRequirement::kNotAllowed,
- ChangeStreamRequirement::kChangeStreamStage};
+ StageConstraints constraints(Pipeline::SplitState) const final;
+
+ GetModPathsReturn getModifiedPaths() const final {
+ // This stage neither modifies nor renames any field.
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
static boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> create(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
index 962e48fe4b8..d4c1d917799 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
@@ -55,8 +55,11 @@ bool isShardConfigEvent(const Document& eventDoc) {
// with 4.2, even though we no longer rely on them to detect new shards. We swallow the event
// here. We may wish to remove this mechanism entirely in 4.7+, or retain it for future cases
// where a change stream is targeted to a subset of shards. See SERVER-44039 for details.
- if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
- DocumentSourceChangeStream::kNewShardDetectedOpType) {
+
+ auto opType = eventDoc[DocumentSourceChangeStream::kOperationTypeField];
+
+ if (!opType.missing() &&
+ opType.getStringData() == DocumentSourceChangeStream::kNewShardDetectedOpType) {
// If the failpoint is enabled, throw the 'ChangeStreamToplogyChange' exception to the
// client. This is used in testing to confirm that the swallowed 'kNewShardDetected' event
// has reached the mongoS.
@@ -87,6 +90,29 @@ DocumentSourceChangeStreamHandleTopologyChange::DocumentSourceChangeStreamHandle
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
+StageConstraints DocumentSourceChangeStreamHandleTopologyChange::constraints(
+ Pipeline::SplitState) const {
+ StageConstraints constraints{StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kMongoS,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed,
+ UnionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+
+ // Can be swapped with the '$match' and 'DocumentSourceSingleDocumentTransformation' stages and
+ // ensures that they get pushed down to the shards, as this stage bisects the change streams
+ // pipeline.
+ if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ constraints.canSwapWithMatch = true;
+ constraints.canSwapWithSingleDocTransform = true;
+ }
+
+ return constraints;
+}
+
DocumentSource::GetNextResult DocumentSourceChangeStreamHandleTopologyChange::doGetNext() {
// For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be
// populated. We also resolve the original aggregation command from the expression context.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
index 1d240be5e3c..339d0db2f4d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
@@ -56,20 +56,19 @@ public:
static boost::intrusive_ptr<DocumentSourceChangeStreamHandleTopologyChange> create(
const boost::intrusive_ptr<ExpressionContext>&);
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
return (explain ? Value(Document{{kStageName, Value()}}) : Value());
}
- virtual StageConstraints constraints(Pipeline::SplitState) const {
- return {StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kMongoS,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kNotAllowed,
- LookupRequirement::kNotAllowed,
- UnionRequirement::kNotAllowed,
- ChangeStreamRequirement::kChangeStreamStage};
+ StageConstraints constraints(Pipeline::SplitState) const final;
+
+ GetModPathsReturn getModifiedPaths() const final {
+ // This stage neither modifies nor renames any field.
+ return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
index 5dd28772c6c..a37869bd035 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
@@ -212,7 +212,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceChangeStreamOplogMatch::creat
const char* DocumentSourceChangeStreamOplogMatch::getSourceName() const {
// This is used in error reporting, particularly if we find this stage in a position other
// than first, so report the name as $changeStream.
- return DocumentSourceChangeStream::kStageName.rawData();
+ return kStageName.rawData();
}
StageConstraints DocumentSourceChangeStreamOplogMatch::constraints(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index a340096f5e7..7452d9f7d98 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -267,7 +267,7 @@ public:
// Check the oplog entry is transformed correctly.
auto transform = stages[2].get();
ASSERT(transform);
- ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName);
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform));
// Create mock stage and insert at the front of the stages.
auto mock = DocumentSourceMock::createForTest(D(entry), getExpCtx());
@@ -466,6 +466,44 @@ public:
}
};
+class ChangeStreamPipelineOptimizationTest : public ChangeStreamStageTest {
+public:
+ explicit ChangeStreamPipelineOptimizationTest() : ChangeStreamStageTest() {}
+
+ void run() {
+ RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsOptimization",
+ true);
+ ASSERT(getCSOptimizationFeatureFlagValue());
+ ChangeStreamStageTest::run();
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> buildTestPipeline(
+ const std::vector<BSONObj>& rawPipeline) {
+ auto expCtx = getExpCtx();
+ expCtx->ns = NamespaceString("a.collection");
+ expCtx->inMongos = true;
+
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ pipeline->optimizePipeline();
+
+ return pipeline;
+ }
+
+ void assertStagesNameOrder(std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ const std::vector<std::string> expectedStages) {
+ ASSERT_EQ(pipeline->getSources().size(), expectedStages.size());
+
+ auto stagesItr = pipeline->getSources().begin();
+ auto expectedStagesItr = expectedStages.begin();
+
+ while (expectedStagesItr != expectedStages.end()) {
+ ASSERT_EQ(*expectedStagesItr, stagesItr->get()->getSourceName());
+ ++expectedStagesItr;
+ ++stagesItr;
+ }
+ }
+};
+
TEST_F(ChangeStreamStageTest, ShouldRejectNonObjectArg) {
auto expCtx = getExpCtx();
@@ -2166,7 +2204,6 @@ template <typename Stage, typename StageSpec>
void validateDocumentSourceStageSerialization(
StageSpec spec, BSONObj specAsBSON, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
auto stage = Stage::createFromBson(specAsBSON.firstElement(), expCtx);
-
vector<Value> serialization;
stage->serializeToArray(serialization);
if (getCSOptimizationFeatureFlagValue()) {
@@ -3304,5 +3341,513 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleMatch) {
+ //
+ // Tests that the single '$match' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$changeStream: {}}"),
+ fromjson("{$match: {operationType: 'insert'}}"),
+ };
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatch) {
+ //
+ // Tests that multiple '$match' gets merged and promoted before the
+ // '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$match: {operationType: 'insert'}}"),
+ fromjson("{$match: {operationType: 'delete'}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatchAndResumeToken) {
+ //
+ // Tests that multiple '$match' gets merged and promoted before the
+ // '$_internalUpdateOnAddShard' if resume token if present.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$match" << BSON("operationType"
+ << "insert")),
+ BSON("$match" << BSON("operationType"
+ << "insert"))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleProject) {
+ //
+ // Tests that the single'$project' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$project: {operationType: 1}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProject) {
+ //
+ // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$project: {operationType: 1}}"),
+ fromjson("{$project: {fullDocument: 1}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$project",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProjectAndResumeToken) {
+ //
+ // Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard' if
+ // resume token is present.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$project" << BSON("operationType" << 1)),
+ BSON("$project" << BSON("fullDocument" << 1))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$project",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithProjectMatchAndResumeToken) {
+ //
+ // Tests that a '$project' followed by a '$match' gets optimized and they get promoted before
+ // the '$_internalUpdateOnAddShard'.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$project" << BSON("operationType" << 1)),
+ BSON("$match" << BSON("operationType"
+ << "insert"))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleUnset) {
+ //
+ // Tests that the single'$unset' gets promoted before the '$_internalUpdateOnAddShard' as
+ // '$project'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$unset: 'operationType'}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleUnset) {
+ //
+ // Tests that multiple '$unset' gets promoted before the '$_internalUpdateOnAddShard' as
+ // '$project'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$unset: 'operationType'}"),
+ fromjson("{$unset: 'fullDocument'}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$project",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithUnsetAndResumeToken) {
+ //
+ // Tests that the '$unset' gets promoted before the '$_internalUpdateOnAddShard' as '$project'
+ // even if resume token is present.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {BSON("$changeStream"
+ << BSON("resumeAfter" << resumeToken)),
+ BSON("$unset"
+ << "operationType")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$project",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleAddFields) {
+ //
+ // Tests that the single'$addFields' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$addFields: {stockPrice: 100}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$addFields",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleAddFields) {
+ //
+ // Tests that multiple '$addFields' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$addFields: {stockPrice: 100}}"),
+ fromjson("{$addFields: {quarter: 'Q1'}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$addFields",
+ "$addFields",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAddFieldsAndResumeToken) {
+ //
+ // Tests that the '$addFields' gets promoted before the '$_internalUpdateOnAddShard' if
+ // resume token is present.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$addFields" << BSON("stockPrice" << 100))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$addFields",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleSet) {
+ //
+ // Tests that the single'$set' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$set: {stockPrice: 100}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$set",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleSet) {
+ //
+ // Tests that multiple '$set' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$set: {stockPrice: 100}}"),
+ fromjson("{$set: {quarter: 'Q1'}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$set",
+ "$set",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSetAndResumeToken) {
+ //
+ // Tests that the '$set' gets promoted before the '$_internalUpdateOnAddShard' if
+ // resume token is present.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$set" << BSON("stockPrice" << 100))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$set",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceRoot) {
+ //
+ // Tests that the single'$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard'.
+ //
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$changeStream: {}}"), fromjson("{$replaceRoot: {newRoot: '$fullDocument'}}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$replaceRoot",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceRootAndResumeToken) {
+ //
+ // Tests that the '$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard' if
+ // resume token is present.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ BSON("$replaceRoot" << BSON("newRoot"
+ << "$fullDocument"))};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$replaceRoot",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceWith) {
+ //
+ // Tests that the single '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' as
+ // '$replaceRoot'.
+ //
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}"),
+ fromjson("{$replaceWith: '$fullDocument'}")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$replaceRoot",
+ "$_internalChangeStreamHandleTopologyChange"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceWithAndResumeToken) {
+ //
+ // Tests that the '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' if
+ // resume token is present as '$replaceRoot'.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {BSON("$changeStream"
+ << BSON("resumeAfter" << resumeToken)),
+ BSON("$replaceWith"
+ << "$fullDocument")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$replaceRoot",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
+TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAllStagesAndResumeToken) {
+ //
+ // Tests that when all allowed stages are included along with the resume token, the final
+ // pipeline gets optimized.
+ //
+ auto resumeToken = makeResumeToken(kDefaultTs, testUuid());
+
+ const std::vector<BSONObj> rawPipeline = {BSON("$changeStream"
+ << BSON("resumeAfter" << resumeToken)),
+ BSON("$project" << BSON("operationType" << 1)),
+ BSON("$unset"
+ << "_id"),
+ BSON("$addFields" << BSON("stockPrice" << 100)),
+ BSON("$set"
+ << BSON("fullDocument.stockPrice" << 100)),
+ BSON("$match" << BSON("operationType"
+ << "insert")),
+ BSON("$replaceRoot" << BSON("newRoot"
+ << "$fullDocument")),
+ BSON("$replaceWith"
+ << "fullDocument.stockPrice")};
+
+ auto pipeline = buildTestPipeline(rawPipeline);
+
+ assertStagesNameOrder(std::move(pipeline),
+ {"$_internalChangeStreamOplogMatch",
+ "$_internalChangeStreamUnwindTransaction",
+ "$_internalChangeStreamTransform",
+ "$_internalChangeStreamCheckInvalidate",
+ "$_internalChangeStreamCheckResumability",
+ "$_internalChangeStreamCheckTopologyChange",
+ "$match",
+ "$project",
+ "$project",
+ "$addFields",
+ "$set",
+ "$replaceRoot",
+ "$replaceRoot",
+ "$_internalChangeStreamHandleTopologyChange",
+ "$_internalChangeStreamEnsureResumeTokenPresent"});
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 229b9229565..c8eacdda7df 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -390,7 +390,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
Value DocumentSourceChangeStreamTransform::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
- return Value(Document{{getSourceName(),
+ return Value(Document{{DocumentSourceChangeStream::kStageName,
Document{{"stage"_sd, "internalTransform"_sd},
{"options"_sd, _changeStreamSpec.toBSON()}}}});
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 46fe08b7a49..f4782ca89cf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -64,8 +64,8 @@ public:
return boost::none;
}
- const char* getSourceName() const {
- return DocumentSourceChangeStream::kStageName.rawData();
+ const char* getSourceName() const final {
+ return kStageName.rawData();
}
protected:
diff --git a/src/mongo/db/pipeline/document_source_limit_test.cpp b/src/mongo/db/pipeline/document_source_limit_test.cpp
index 04aa426bd1e..899dbe4de42 100644
--- a/src/mongo/db/pipeline/document_source_limit_test.cpp
+++ b/src/mongo/db/pipeline/document_source_limit_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/unittest.h"
@@ -90,6 +91,23 @@ TEST_F(DocumentSourceLimitTest, TwoLimitStagesShouldCombineIntoOne) {
ASSERT_EQUALS(1U, container.size());
}
+TEST_F(DocumentSourceLimitTest, DoesNotPushProjectBeforeSelf) {
+ Pipeline::SourceContainer container;
+ auto limit = DocumentSourceLimit::create(getExpCtx(), 10);
+ auto project =
+ DocumentSourceProject::create(BSON("fullDocument" << true), getExpCtx(), "$project"_sd);
+
+ container.push_back(limit);
+ container.push_back(project);
+
+ limit->optimizeAt(container.begin(), &container);
+
+ ASSERT_EQUALS(2U, container.size());
+ ASSERT(dynamic_cast<DocumentSourceLimit*>(container.begin()->get()));
+ ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(
+ std::next(container.begin())->get()));
+}
+
TEST_F(DocumentSourceLimitTest, DisposeShouldCascadeAllTheWayToSource) {
auto source = DocumentSourceMock::createForTest({"{a: 1}", "{a: 1}"}, getExpCtx());
diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp
index 703512557d6..6ab275e0417 100644
--- a/src/mongo/db/pipeline/document_source_match_test.cpp
+++ b/src/mongo/db/pipeline/document_source_match_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/death_test.h"
@@ -492,6 +493,23 @@ TEST_F(DocumentSourceMatchTest, MultipleMatchStagesShouldCombineIntoOne) {
"{c:1}]}"));
}
+TEST_F(DocumentSourceMatchTest, DoesNotPushProjectBeforeSelf) {
+ Pipeline::SourceContainer container;
+ auto match = DocumentSourceMatch::create(BSON("_id" << 1), getExpCtx());
+ auto project =
+ DocumentSourceProject::create(BSON("fullDocument" << true), getExpCtx(), "$project"_sd);
+
+ container.push_back(match);
+ container.push_back(project);
+
+ match->optimizeAt(container.begin(), &container);
+
+ ASSERT_EQUALS(2U, container.size());
+ ASSERT(dynamic_cast<DocumentSourceMatch*>(container.begin()->get()));
+ ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(
+ std::next(container.begin())->get()));
+}
+
TEST_F(DocumentSourceMatchTest, ShouldPropagatePauses) {
auto match = DocumentSourceMatch::create(BSON("a" << 1), getExpCtx());
auto mock =
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 980fc537f97..9227cc35f71 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/temp_dir.h"
@@ -163,6 +164,23 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->distributedPlanLogic()->mergingStage.get()));
}
+TEST_F(DocumentSourceSortTest, DoesNotPushProjectBeforeSelf) {
+ Pipeline::SourceContainer container;
+ createSort(BSON("_id" << 1));
+ auto project =
+ DocumentSourceProject::create(BSON("fullDocument" << true), getExpCtx(), "$project"_sd);
+
+ container.push_back(sort());
+ container.push_back(project);
+
+ sort()->optimizeAt(container.begin(), &container);
+
+ ASSERT_EQUALS(2U, container.size());
+ ASSERT(dynamic_cast<DocumentSourceSort*>(container.begin()->get()));
+ ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(
+ std::next(container.begin())->get()));
+}
+
TEST_F(DocumentSourceSortTest, Dependencies) {
createSort(BSON("a" << 1 << "b.c" << -1));
DepsTracker dependencies;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 37586a16abd..fb81a3ed888 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -76,7 +76,7 @@ using std::vector;
const NamespaceString kTestNss = NamespaceString("a.collection");
size_t getChangeStreamStageSize() {
- return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 5 : 6);
+ return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 6 : 6);
}
void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) {
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index e491d99afdc..3f167f28530 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -176,11 +176,6 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional
}
void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& event) const {
- // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
- if (_expCtx->needsMerge) {
- return;
- }
-
// Confirm that the document _id field matches the original resume token in the sort key field.
auto eventBSON = event.toBson();
auto resumeToken = event.metadata().getSortKey();
diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h
index 661b9954195..b520dfdbbdb 100644
--- a/src/mongo/db/pipeline/stage_constraints.h
+++ b/src/mongo/db/pipeline/stage_constraints.h
@@ -328,6 +328,10 @@ struct StageConstraints {
// documents because our implementation of $sample shuffles the order
bool canSwapWithSkippingOrLimitingStage = false;
+ // If true, then any stage of kind 'DocumentSourceSingleDocumentTransformation' can be swapped
+ // ahead of this stage.
+ bool canSwapWithSingleDocTransform = false;
+
// Indicates that a stage is allowed within a pipeline-stlye update.
bool isAllowedWithinUpdatePipeline = false;
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index d6eca0d9b1b..236729e2359 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -489,6 +489,11 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
Status AsyncResultsMerger::scheduleGetMores() {
stdx::lock_guard<Latch> lk(_mutex);
+
+ if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ _assertNotInvalidated(lk);
+ }
+
return _scheduleGetMores(lk);
}