summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-05-18 09:19:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-20 11:54:09 +0000
commitac173f107cf2642007bbae401eac5fa62106eda7 (patch)
treeed0794aaa46b4ecfffbcbdfd423e390996d5ff0c /src/mongo/db/pipeline
parent6997403b9fab1e397e1e13bbdf6e899aeee8b6fd (diff)
downloadmongo-ac173f107cf2642007bbae401eac5fa62106eda7.tar.gz
SERVER-55424: Rename change streams DocumentSources to include 'DocumentSourceChangeStream' prefix.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript12
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp59
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp (renamed from src/mongo/db/pipeline/document_source_check_invalidate.cpp)17
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h (renamed from src/mongo/db/pipeline/document_source_check_invalidate.h)22
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp (renamed from src/mongo/db/pipeline/document_source_check_resume_token.cpp)30
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.h (renamed from src/mongo/db/pipeline/document_source_check_resume_token.h)18
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h13
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp (renamed from src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp)24
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h (renamed from src/mongo/db/pipeline/document_source_lookup_change_post_image.h)17
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp (renamed from src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp)44
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.cpp (renamed from src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp)27
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h (renamed from src/mongo/db/pipeline/document_source_lookup_change_pre_image.h)17
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp22
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp32
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp (renamed from src/mongo/db/pipeline/document_source_update_on_add_shard.cpp)25
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h (renamed from src/mongo/db/pipeline/document_source_update_on_add_shard.h)8
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp52
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp24
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp2
25 files changed, 284 insertions, 247 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index af8c34fcbf3..9c167fb3b3b 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -341,17 +341,17 @@ env.Library(
'change_stream_document_diff_parser.cpp',
'change_stream_helpers_legacy.cpp',
'document_source_change_stream.cpp',
+ 'document_source_change_stream_check_invalidate.cpp',
+ 'document_source_change_stream_check_resumability.cpp',
'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_ensure_resume_token_present.cpp',
+ 'document_source_change_stream_lookup_post_image.cpp',
+ 'document_source_change_stream_lookup_pre_image.cpp',
'document_source_change_stream_oplog_match.cpp',
'document_source_change_stream_topology_change.cpp',
'document_source_change_stream_transform.cpp',
'document_source_change_stream_unwind_transactions.cpp',
- 'document_source_check_invalidate.cpp',
- 'document_source_check_resume_token.cpp',
- 'document_source_lookup_change_post_image.cpp',
- 'document_source_lookup_change_pre_image.cpp',
- 'document_source_update_on_add_shard.cpp',
+ 'document_source_change_stream_update_on_add_shard.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/pipeline/pipeline',
@@ -423,7 +423,7 @@ env.CppUnitTest(
'document_source_internal_shard_filter_test.cpp',
'document_source_internal_split_pipeline_test.cpp',
'document_source_limit_test.cpp',
- 'document_source_lookup_change_post_image_test.cpp',
+ 'document_source_change_stream_lookup_post_image_test.cpp',
'document_source_lookup_test.cpp',
'document_source_match_test.cpp',
'document_source_merge_cursors_test.cpp',
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 5d83629a751..65baaeb3e6b 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -31,17 +31,17 @@
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h"
-#include "mongo/db/pipeline/document_source_check_invalidate.h"
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
-#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
-#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
+#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
#include "mongo/db/pipeline/expression.h"
namespace mongo {
@@ -70,7 +70,8 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
const ResumeTokenData tokenData = token.getData();
// If resuming from an "invalidate" using "startAfter", pass along the resume token data to
- // DocumentSourceCheckInvalidate to signify that another invalidate should not be generated.
+ // DocumentSourceChangeStreamCheckInvalidate to signify that another invalidate should not
+ // be generated.
if (startAfter && tokenData.fromInvalidate) {
startAfterInvalidate = tokenData;
}
@@ -93,9 +94,10 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// before any event that would sort after it. High water mark tokens, however, do not refer
// to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'.
if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) {
- resumeStage = DocumentSourceCheckResumability::create(expCtx, tokenData);
+ resumeStage = DocumentSourceChangeStreamCheckResumability::create(expCtx, tokenData);
} else {
- resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, tokenData);
+ resumeStage =
+ DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, tokenData);
}
}
@@ -104,7 +106,8 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
!resumeStage);
- resumeStage = DocumentSourceCheckResumability::create(expCtx, *startAtOperationTime);
+ resumeStage =
+ DocumentSourceChangeStreamCheckResumability::create(expCtx, *startAtOperationTime);
}
auto transformStage = DocumentSourceChangeStreamTransform::createFromBson(rawSpec, expCtx);
@@ -115,7 +118,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies
// upon the fact that it is always the first stage in the pipeline.
- stages.push_back(DocumentSourceOplogMatch::create(expCtx, showMigrationEvents));
+ stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, showMigrationEvents));
stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
stages.push_back(transformStage);
@@ -131,24 +134,26 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// The resume stage must come after the check invalidate stage so that the former can determine
// whether the event that matches the resume token should be followed by an "invalidate" event.
- stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, startAfterInvalidate));
+ stages.push_back(
+ DocumentSourceChangeStreamCheckInvalidate::create(expCtx, startAfterInvalidate));
- // The resume stage 'DocumentSourceCheckResumability' should come before the split point stage
- // 'DocumentSourceUpdateOnAddShard'.
+ // The resume stage 'DocumentSourceChangeStreamCheckResumability' should come before the split
+ // point stage 'DocumentSourceChangeStreamUpdateOnAddShard'.
if (resumeStage &&
- resumeStage->getSourceName() == DocumentSourceCheckResumability::kStageName) {
+ resumeStage->getSourceName() == DocumentSourceChangeStreamCheckResumability::kStageName) {
stages.push_back(resumeStage);
resumeStage.reset();
}
- // If the pipeline is build on MongoS, then the stage 'DocumentSourceUpdateOnAddShard' acts as
- // the split point for the pipline. All stages before this stages will run on shards and all
- // stages after and inclusive of this stage will run on the MongoS.
+ // If the pipeline is build on MongoS, then the stage
+ // 'DocumentSourceChangeStreamUpdateOnAddShard' acts as the split point for the pipline. All
+ // stages before this stages will run on shards and all stages after and inclusive of this stage
+ // will run on the MongoS.
if (expCtx->inMongos) {
- stages.push_back(DocumentSourceUpdateOnAddShard::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamUpdateOnAddShard::create(expCtx));
}
- // This resume stage should be 'DocumentSourceEnsureResumeTokenPresent'.
+ // This resume stage should be 'DocumentSourceChangeStreamEnsureResumeTokenPresent'.
if (resumeStage) {
stages.push_back(resumeStage);
}
@@ -158,7 +163,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// There should only be one close cursor stage. If we're on the shards and producing
// input to be merged, do not add a close cursor stage, since the mongos will already
// have one.
- stages.push_back(DocumentSourceCloseCursor::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx));
}
// We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
@@ -169,13 +174,13 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// TODO SERVER-36941: figure out how to get this to work in a sharded cluster.
if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) {
invariant(!expCtx->inMongos);
- stages.push_back(DocumentSourceLookupChangePreImage::create(expCtx, spec));
+ stages.push_back(DocumentSourceChangeStreamLookupPreImage::create(expCtx, spec));
}
// There should be only one post-image lookup stage. If we're on the shards and producing
// input to be merged, the lookup is done on the mongos.
if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) {
- stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx));
}
}
@@ -183,7 +188,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
}
} // namespace change_stream_legacy
-Value DocumentSourceOplogMatch::serializeLegacy(
+Value DocumentSourceChangeStreamOplogMatch::serializeLegacy(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain ? Value(Document{{"$_internalOplogMatch"_sd, Document{}}}) : Value();
}
@@ -201,13 +206,13 @@ Value DocumentSourceChangeStreamTransform::serializeLegacy(
return Value(Document{{getSourceName(), _changeStreamSpec.toBSON()}});
}
-Value DocumentSourceCheckInvalidate::serializeLegacy(
+Value DocumentSourceChangeStreamCheckInvalidate::serializeLegacy(
boost::optional<ExplainOptions::Verbosity> explain) const {
// We only serialize this stage in the context of explain.
return explain ? Value(DOC(kStageName << Document())) : Value();
}
-Value DocumentSourceCheckResumability::serializeLegacy(
+Value DocumentSourceChangeStreamCheckResumability::serializeLegacy(
boost::optional<ExplainOptions::Verbosity> explain) const {
// We only serialize this stage in the context of explain.
return explain ? Value(DOC(getSourceName()
@@ -225,12 +230,12 @@ Value DocumentSourceChangeStreamUnwindTransaction::serializeLegacy(
return Value();
}
-Value DocumentSourceLookupChangePreImage::serializeLegacy(
+Value DocumentSourceChangeStreamLookupPreImage::serializeLegacy(
boost::optional<ExplainOptions::Verbosity> explain) const {
return (explain ? Value{Document{{kStageName, Document()}}} : Value());
}
-Value DocumentSourceLookupChangePostImage::serializeLegacy(
+Value DocumentSourceChangeStreamLookupPostImage::serializeLegacy(
boost::optional<ExplainOptions::Verbosity> explain) const {
return (explain ? Value{Document{{kStageName, Document()}}} : Value());
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a1543b79dfd..94c6f58adf0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -38,16 +38,16 @@
#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/document_path_support.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h"
-#include "mongo/db/pipeline/document_source_check_invalidate.h"
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
-#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
#include "mongo/db/pipeline/document_source_sort.h"
-#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/resume_token.h"
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
index fb8d12521ff..a5ee091e326 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -32,7 +32,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_check_invalidate.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/util/assert_util.h"
@@ -43,7 +43,7 @@ using DSCS = DocumentSourceChangeStream;
REGISTER_INTERNAL_DOCUMENT_SOURCE(
_internalChangeStreamCheckInvalidate,
LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceCheckInvalidate::createFromBson,
+ DocumentSourceChangeStreamCheckInvalidate::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
namespace {
@@ -65,7 +65,8 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt
} // namespace
-boost::intrusive_ptr<DocumentSourceCheckInvalidate> DocumentSourceCheckInvalidate::createFromBson(
+boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate>
+DocumentSourceChangeStreamCheckInvalidate::createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467602,
str::stream() << "the '" << kStageName << "' object spec must be an object",
@@ -74,14 +75,14 @@ boost::intrusive_ptr<DocumentSourceCheckInvalidate> DocumentSourceCheckInvalidat
auto parsed = DocumentSourceChangeStreamCheckInvalidateSpec::parse(
IDLParserErrorContext("DocumentSourceChangeStreamCheckInvalidateSpec"),
spec.embeddedObject());
- return new DocumentSourceCheckInvalidate(
+ return new DocumentSourceChangeStreamCheckInvalidate(
expCtx,
parsed.getStartAfterInvalidate()
? boost::optional<ResumeTokenData>(parsed.getStartAfterInvalidate()->getData())
: boost::none);
}
-DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNext() {
// To declare a change stream as invalidated, this stage first emits an invalidate event and
// then throws a 'ChangeStreamInvalidated' exception on the next call to this method.
@@ -160,7 +161,7 @@ DocumentSource::GetNextResult DocumentSourceCheckInvalidate::doGetNext() {
return nextInput;
}
-Value DocumentSourceCheckInvalidate::serializeLatest(
+Value DocumentSourceChangeStreamCheckInvalidate::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(Document{{DocumentSourceChangeStream::kStageName,
@@ -171,7 +172,7 @@ Value DocumentSourceCheckInvalidate::serializeLatest(
if (_startAfterInvalidate) {
spec.setStartAfterInvalidate(ResumeToken(*_startAfterInvalidate));
}
- return Value(Document{{DocumentSourceCheckInvalidate::kStageName, spec.toBSON()}});
+ return Value(Document{{DocumentSourceChangeStreamCheckInvalidate::kStageName, spec.toBSON()}});
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
index 88c9d3372f1..92647a6b48d 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -39,14 +39,15 @@ namespace mongo {
* "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop
* for a single-collection change stream). It is not intended to be created by the user.
*/
-class DocumentSourceCheckInvalidate final : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamCheckInvalidate final
+ : public DocumentSource,
+ public ChangeStreamStageSerializationInterface {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckInvalidate"_sd;
const char* getSourceName() const final {
// This is used in error reporting.
- return DocumentSourceCheckInvalidate::kStageName.rawData();
+ return DocumentSourceChangeStreamCheckInvalidate::kStageName.rawData();
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -69,20 +70,21 @@ public:
return ChangeStreamStageSerializationInterface::serializeToValue(explain);
}
- static boost::intrusive_ptr<DocumentSourceCheckInvalidate> createFromBson(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
- static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<ResumeTokenData> startAfterInvalidate) {
- return new DocumentSourceCheckInvalidate(expCtx, std::move(startAfterInvalidate));
+ return new DocumentSourceChangeStreamCheckInvalidate(expCtx,
+ std::move(startAfterInvalidate));
}
private:
/**
- * Use the create static method to create a DocumentSourceCheckInvalidate.
+ * Use the create static method to create a DocumentSourceChangeStreamCheckInvalidate.
*/
- DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- boost::optional<ResumeTokenData> startAfterInvalidate)
+ DocumentSourceChangeStreamCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<ResumeTokenData> startAfterInvalidate)
: DocumentSource(kStageName, expCtx),
_startAfterInvalidate(std::move(startAfterInvalidate)) {
invariant(!_startAfterInvalidate ||
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index d359e7f0bb3..ffb7ff6cd72 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/curop.h"
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -41,10 +41,10 @@ namespace {
REGISTER_INTERNAL_DOCUMENT_SOURCE(
_internalChangeStreamCheckResumability,
LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceCheckResumability::createFromBson,
+ DocumentSourceChangeStreamCheckResumability::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
-using ResumeStatus = DocumentSourceCheckResumability::ResumeStatus;
+using ResumeStatus = DocumentSourceChangeStreamCheckResumability::ResumeStatus;
// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
@@ -173,26 +173,28 @@ ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionConte
}
} // namespace
-DocumentSourceCheckResumability::DocumentSourceCheckResumability(
+DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResumability(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
: DocumentSource(getSourceName(), expCtx), _tokenFromClient(std::move(token)) {}
-intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
- const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
+intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
+DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
+ Timestamp ts) {
// We are resuming from a point in time, not an event. Seed the stage with a high water mark.
return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData());
}
-intrusive_ptr<DocumentSourceCheckResumability> DocumentSourceCheckResumability::create(
- const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
- return new DocumentSourceCheckResumability(expCtx, std::move(token));
+intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
+DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
+ ResumeTokenData token) {
+ return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(token));
}
-const char* DocumentSourceCheckResumability::getSourceName() const {
+const char* DocumentSourceChangeStreamCheckResumability::getSourceName() const {
return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceCheckResumability::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGetNext() {
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
return pSource->getNext();
}
@@ -234,7 +236,7 @@ DocumentSource::GetNextResult DocumentSourceCheckResumability::doGetNext() {
MONGO_UNREACHABLE;
}
-Value DocumentSourceCheckResumability::serializeLatest(
+Value DocumentSourceChangeStreamCheckResumability::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
? Value(DOC(DocumentSourceChangeStream::kStageName
@@ -242,7 +244,7 @@ Value DocumentSourceCheckResumability::serializeLatest(
<< "internalCheckResumability"_sd
<< "resumeToken" << ResumeToken(_tokenFromClient).toDocument())))
: Value(Document{
- {DocumentSourceCheckResumability::kStageName,
+ {DocumentSourceChangeStreamCheckResumability::kStageName,
DocumentSourceChangeStreamCheckResumabilitySpec(ResumeToken(_tokenFromClient))
.toBSON()}});
}
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
index ad546f51b4b..9cbed08490a 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -59,8 +59,8 @@ namespace mongo {
* - Otherwise we cannot resume, as we do not know if there were any events between the resume token
* and the first matching document in the oplog.
*/
-class DocumentSourceCheckResumability : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamCheckResumability : public DocumentSource,
+ public ChangeStreamStageSerializationInterface {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckResumability"_sd;
@@ -94,7 +94,7 @@ public:
return ChangeStreamStageSerializationInterface::serializeToValue(explain);
}
- static boost::intrusive_ptr<DocumentSourceCheckResumability> createFromBson(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467603,
str::stream() << "the '" << kStageName << "' object spec must be an object",
@@ -106,18 +106,18 @@ public:
return create(expCtx, parsed.getResumeToken().getData());
}
- static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts);
- static boost::intrusive_ptr<DocumentSourceCheckResumability> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
protected:
/**
- * Use the create static method to create a DocumentSourceCheckResumability.
+ * Use the create static method to create a DocumentSourceChangeStreamCheckResumability.
*/
- DocumentSourceCheckResumability(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- ResumeTokenData token);
+ DocumentSourceChangeStreamCheckResumability(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
GetNextResult doGetNext() override;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
index a4a43c654ee..a7db8e647a0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
@@ -52,7 +52,7 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt
} // namespace
-DocumentSource::GetNextResult DocumentSourceCloseCursor::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamCloseCursor::doGetNext() {
// Close cursor if we have returned an invalidate entry.
if (_shouldCloseCursor) {
uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index 59be3f550b3..16a1e13f2d4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -41,13 +41,13 @@ namespace mongo {
* "invalidate" entries.
* It is not intended to be created by the user.
*/
-class DocumentSourceCloseCursor final : public DocumentSource {
+class DocumentSourceChangeStreamCloseCursor final : public DocumentSource {
public:
static constexpr StringData kStageName = "$changeStream"_sd;
const char* getSourceName() const final {
// This is used in error reporting.
- return DocumentSourceCloseCursor::kStageName.rawData();
+ return DocumentSourceChangeStreamCloseCursor::kStageName.rawData();
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -71,9 +71,9 @@ public:
return explain ? Value(DOC(kStageName << Document())) : Value();
}
- static boost::intrusive_ptr<DocumentSourceCloseCursor> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCloseCursor> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceCloseCursor(expCtx);
+ return new DocumentSourceChangeStreamCloseCursor(expCtx);
}
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
@@ -82,9 +82,9 @@ public:
private:
/**
- * Use the create static method to create a DocumentSourceCloseCursor.
+ * Use the create static method to create a DocumentSourceChangeStreamCloseCursor.
*/
- DocumentSourceCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ DocumentSourceChangeStreamCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
GetNextResult doGetNext() final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
index 61f382e3e9d..cb6dfc0ede1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
@@ -33,21 +33,22 @@
namespace mongo {
-DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
- : DocumentSourceCheckResumability(expCtx, std::move(token)) {}
+DocumentSourceChangeStreamEnsureResumeTokenPresent::
+ DocumentSourceChangeStreamEnsureResumeTokenPresent(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
+ : DocumentSourceChangeStreamCheckResumability(expCtx, std::move(token)) {}
-boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
-DocumentSourceEnsureResumeTokenPresent::create(
+boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
+DocumentSourceChangeStreamEnsureResumeTokenPresent::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
- return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
+ return new DocumentSourceChangeStreamEnsureResumeTokenPresent(expCtx, std::move(token));
}
-const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
+const char* DocumentSourceChangeStreamEnsureResumeTokenPresent::getSourceName() const {
return kStageName.rawData();
}
-DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamEnsureResumeTokenPresent::doGetNext() {
// If we have already verified the resume token is present, return the next doc immediately.
if (_resumeStatus == ResumeStatus::kSurpassedToken) {
return pSource->getNext();
@@ -59,9 +60,10 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext(
// occurred at the same clusterTime on more than one shard, then we may see multiple identical
// resume tokens here. We swallow all of them until the resume status becomes kSurpassedToken.
while (_resumeStatus != ResumeStatus::kSurpassedToken) {
- // Delegate to DocumentSourceCheckResumability to consume all events up to the token. This
- // will also set '_resumeStatus' to indicate whether we have seen or surpassed the token.
- nextInput = DocumentSourceCheckResumability::doGetNext();
+ // Delegate to DocumentSourceChangeStreamCheckResumability to consume all events up to the
+ // token. This will also set '_resumeStatus' to indicate whether we have seen or surpassed
+ // the token.
+ nextInput = DocumentSourceChangeStreamCheckResumability::doGetNext();
// If there are no more results, return EOF. We will continue checking for the resume token
// the next time the getNext method is called. If we hit EOF, then we cannot have surpassed
@@ -92,7 +94,7 @@ DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::doGetNext(
return nextInput;
}
-Value DocumentSourceEnsureResumeTokenPresent::serialize(
+Value DocumentSourceChangeStreamEnsureResumeTokenPresent::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
// We only serialize this stage in the context of explain.
if (explain) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
index c81f87b3617..b380f72d643 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
@@ -29,14 +29,15 @@
#pragma once
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
namespace mongo {
/**
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSourceCheckResumability {
+class DocumentSourceChangeStreamEnsureResumeTokenPresent final
+ : public DocumentSourceChangeStreamCheckResumability {
public:
static constexpr StringData kStageName = "$_internalEnsureResumeTokenPresent"_sd;
@@ -56,17 +57,17 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
private:
/**
- * Use the create static method to create a DocumentSourceEnsureResumeTokenPresent.
+ * Use the create static method to create a DocumentSourceChangeStreamEnsureResumeTokenPresent.
*/
- DocumentSourceEnsureResumeTokenPresent(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- ResumeTokenData token);
+ DocumentSourceChangeStreamEnsureResumeTokenPresent(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
GetNextResult doGetNext() final;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp
index caa1bc5c058..8659f71e325 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -29,20 +29,20 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
namespace mongo {
-constexpr StringData DocumentSourceLookupChangePostImage::kStageName;
-constexpr StringData DocumentSourceLookupChangePostImage::kFullDocumentFieldName;
+constexpr StringData DocumentSourceChangeStreamLookupPostImage::kStageName;
+constexpr StringData DocumentSourceChangeStreamLookupPostImage::kFullDocumentFieldName;
namespace {
REGISTER_INTERNAL_DOCUMENT_SOURCE(
_internalChangeStreamLookupPostImage,
LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceLookupChangePostImage::createFromBson,
+ DocumentSourceChangeStreamLookupPostImage::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
@@ -58,8 +58,8 @@ Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType
}
} // namespace
-boost::intrusive_ptr<DocumentSourceLookupChangePostImage>
-DocumentSourceLookupChangePostImage::createFromBson(
+boost::intrusive_ptr<DocumentSourceChangeStreamLookupPostImage>
+DocumentSourceChangeStreamLookupPostImage::createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467608,
str::stream() << "the '" << kStageName << "' stage spec must be an object",
@@ -70,10 +70,10 @@ DocumentSourceLookupChangePostImage::createFromBson(
str::stream() << "the 'fullDocument' field can only be 'updateLookup'",
parsedSpec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup);
- return new DocumentSourceLookupChangePostImage(expCtx);
+ return new DocumentSourceChangeStreamLookupPostImage(expCtx);
}
-DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamLookupPostImage::doGetNext() {
auto input = pSource->getNext();
if (!input.isAdvanced()) {
return input;
@@ -89,7 +89,7 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() {
return output.freeze();
}
-NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace(
+NamespaceString DocumentSourceChangeStreamLookupPostImage::assertValidNamespace(
const Document& inputDoc) const {
auto namespaceObject =
assertFieldHasType(inputDoc, DocumentSourceChangeStream::kNamespaceField, BSONType::Object)
@@ -110,7 +110,7 @@ NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace(
return nss;
}
-Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updateOp) const {
+Value DocumentSourceChangeStreamLookupPostImage::lookupPostImage(const Document& updateOp) const {
// Make sure we have a well-formed input.
auto nss = assertValidNamespace(updateOp);
@@ -147,7 +147,7 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL));
}
-Value DocumentSourceLookupChangePostImage::serializeLatest(
+Value DocumentSourceChangeStreamLookupPostImage::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
? Value(Document{{DocumentSourceChangeStream::kStageName,
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h
index c674d2218e3..2a7c9666e68 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -38,22 +38,23 @@ namespace mongo {
* Part of the change stream API machinery used to look up the post-image of a document. Uses the
* "documentKey" field of the input to look up the new version of the document.
*/
-class DocumentSourceLookupChangePostImage final : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamLookupPostImage final
+ : public DocumentSource,
+ public ChangeStreamStageSerializationInterface {
public:
static constexpr StringData kStageName = "$_internalChangeStreamLookupPostImage"_sd;
static constexpr StringData kFullDocumentFieldName =
DocumentSourceChangeStream::kFullDocumentField;
/**
- * Creates a DocumentSourceLookupChangePostImage stage.
+ * Creates a DocumentSourceChangeStreamLookupPostImage stage.
*/
- static boost::intrusive_ptr<DocumentSourceLookupChangePostImage> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPostImage> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceLookupChangePostImage(expCtx);
+ return new DocumentSourceChangeStreamLookupPostImage(expCtx);
}
- static boost::intrusive_ptr<DocumentSourceLookupChangePostImage> createFromBson(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPostImage> createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
@@ -107,7 +108,7 @@ public:
}
private:
- DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ DocumentSourceChangeStreamLookupPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
/**
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp
index 8ed1907f31c..6672e54f1c6 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image_test.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -41,7 +41,7 @@
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h"
@@ -55,7 +55,7 @@ using std::vector;
using MockMongoInterface = StubLookupSingleDocumentProcessInterface;
// This provides access to getExpCtx(), but we'll use a different name for this test suite.
-class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture {
+class DocumentSourceChangeStreamLookupPostImageTest : public AggregationContextFixture {
public:
/**
* This method is required to avoid a static initialization fiasco resulting from calling
@@ -78,11 +78,11 @@ public:
}
};
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with a document without a "documentKey" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -101,11 +101,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfMissingOperationType) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -124,11 +124,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfMissingNamespace) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -148,11 +148,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfNsFieldHasWrongType) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with a document without a "ns" field.
auto mockLocalSource =
@@ -171,11 +171,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40578);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -194,13 +194,14 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDatabaseMismatchOnCollectionlessNss) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest,
+ ShouldErrorIfDatabaseMismatchOnCollectionlessNss) {
auto expCtx = getExpCtx();
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test");
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -219,13 +220,14 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDatabaseMismatchOnC
ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPassIfDatabaseMatchesOnCollectionlessNss) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest,
+ ShouldPassIfDatabaseMatchesOnCollectionlessNss) {
auto expCtx = getExpCtx();
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test");
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock out the foreign collection.
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}};
@@ -251,11 +253,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPassIfDatabaseMatchesOnCol
{"fullDocument", Document{{"_id", 0}}}}));
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input with an update document.
auto mockLocalSource = DocumentSourceMock::createForTest(
@@ -277,11 +279,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni
lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments);
}
-TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
+TEST_F(DocumentSourceChangeStreamLookupPostImageTest, ShouldPropagatePauses) {
auto expCtx = getExpCtx();
// Set up the lookup change post image stage.
- auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+ auto lookupChangeStage = DocumentSourceChangeStreamLookupPostImage::create(expCtx);
// Mock its input, pausing every other result.
auto mockLocalSource = DocumentSourceMock::createForTest(
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.cpp
index f8510e741b5..b4a7c51a735 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2020-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -32,7 +32,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
#include "mongo/db/transaction_history_iterator.h"
@@ -44,34 +44,35 @@ namespace {
REGISTER_INTERNAL_DOCUMENT_SOURCE(
_internalChangeStreamLookupPreImage,
LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceLookupChangePreImage::createFromBson,
+ DocumentSourceChangeStreamLookupPreImage::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
}
-constexpr StringData DocumentSourceLookupChangePreImage::kStageName;
-constexpr StringData DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName;
+constexpr StringData DocumentSourceChangeStreamLookupPreImage::kStageName;
+constexpr StringData DocumentSourceChangeStreamLookupPreImage::kFullDocumentBeforeChangeFieldName;
-boost::intrusive_ptr<DocumentSourceLookupChangePreImage> DocumentSourceLookupChangePreImage::create(
+boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage>
+DocumentSourceChangeStreamLookupPreImage::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
auto mode = spec.getFullDocumentBeforeChange();
- return make_intrusive<DocumentSourceLookupChangePreImage>(expCtx, mode);
+ return make_intrusive<DocumentSourceChangeStreamLookupPreImage>(expCtx, mode);
}
-boost::intrusive_ptr<DocumentSourceLookupChangePreImage>
-DocumentSourceLookupChangePreImage::createFromBson(
+boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage>
+DocumentSourceChangeStreamLookupPreImage::createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467610,
str::stream() << "the '" << kStageName << "' stage spec must be an object",
elem.type() == BSONType::Object);
auto parsedSpec = DocumentSourceChangeStreamLookUpPreImageSpec::parse(
IDLParserErrorContext("DocumentSourceChangeStreamLookUpPreImageSpec"), elem.Obj());
- return make_intrusive<DocumentSourceLookupChangePreImage>(
+ return make_intrusive<DocumentSourceChangeStreamLookupPreImage>(
expCtx, parsedSpec.getFullDocumentBeforeChange());
}
-DocumentSource::GetNextResult DocumentSourceLookupChangePreImage::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamLookupPreImage::doGetNext() {
auto input = pSource->getNext();
if (!input.isAdvanced()) {
return input;
@@ -112,7 +113,7 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePreImage::doGetNext() {
return outputDoc.freeze();
}
-boost::optional<Document> DocumentSourceLookupChangePreImage::lookupPreImage(
+boost::optional<Document> DocumentSourceChangeStreamLookupPreImage::lookupPreImage(
const Document& inputDoc, const repl::OpTime& opTime) const {
// We need the oplog's UUID for lookup, so obtain the collection info via MongoProcessInterface.
auto localOplogInfo = pExpCtx->mongoProcessInterface->getCollectionOptions(
@@ -153,7 +154,7 @@ boost::optional<Document> DocumentSourceLookupChangePreImage::lookupPreImage(
return Document{opLogEntry.getObject().getOwned()};
}
-Value DocumentSourceLookupChangePreImage::serializeLatest(
+Value DocumentSourceChangeStreamLookupPreImage::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
? Value(Document{
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h
index c06b1b4a7e0..10e4028ee40 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2020-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -41,25 +41,26 @@ namespace mongo {
* its "fullDocumentBeforeChange" field shall be the optime of the noop oplog entry containing the
* pre-image. This stage replaces that field with the actual pre-image document.
*/
-class DocumentSourceLookupChangePreImage final : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamLookupPreImage final
+ : public DocumentSource,
+ public ChangeStreamStageSerializationInterface {
public:
static constexpr StringData kStageName = "$_internalChangeStreamLookupPreImage"_sd;
static constexpr StringData kFullDocumentBeforeChangeFieldName =
DocumentSourceChangeStream::kFullDocumentBeforeChangeField;
/**
- * Creates a DocumentSourceLookupChangePostImage stage.
+ * Creates a DocumentSourceChangeStreamLookupPostImage stage.
*/
- static boost::intrusive_ptr<DocumentSourceLookupChangePreImage> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
- static boost::intrusive_ptr<DocumentSourceLookupChangePreImage> createFromBson(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamLookupPreImage> createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
- DocumentSourceLookupChangePreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- FullDocumentBeforeChangeModeEnum mode)
+ DocumentSourceChangeStreamLookupPreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ FullDocumentBeforeChangeModeEnum mode)
: DocumentSource(kStageName, expCtx), _fullDocumentBeforeChangeMode(mode) {
// This stage should never be created with FullDocumentBeforeChangeMode::kOff.
invariant(_fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kOff);
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
index ba4655cc137..6f5e55cd47d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
@@ -37,7 +37,7 @@ namespace mongo {
REGISTER_INTERNAL_DOCUMENT_SOURCE(
_internalChangeStreamOplogMatch,
LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceOplogMatch::createFromBson,
+ DocumentSourceChangeStreamOplogMatch::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
namespace {
@@ -192,11 +192,12 @@ BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-boost::intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, bool showMigrationEvents) {
+boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch>
+DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ bool showMigrationEvents) {
// TODO SERVER-56669: ensure that 'initialPostBatchResumeToken' is always populated at this
// point.
- return make_intrusive<DocumentSourceOplogMatch>(
+ return make_intrusive<DocumentSourceChangeStreamOplogMatch>(
buildMatchFilter(
expCtx,
ResumeToken::parse(expCtx->initialPostBatchResumeToken).getData().clusterTime,
@@ -204,23 +205,24 @@ boost::intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create(
expCtx);
}
-boost::intrusive_ptr<DocumentSource> DocumentSourceOplogMatch::createFromBson(
+boost::intrusive_ptr<DocumentSource> DocumentSourceChangeStreamOplogMatch::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(5467600,
"the match filter must be an expression in an object",
elem.type() == BSONType::Object);
auto parsedSpec = DocumentSourceChangeStreamOplogMatchSpec::parse(
IDLParserErrorContext("DocumentSourceChangeStreamOplogMatchSpec"), elem.Obj());
- return make_intrusive<DocumentSourceOplogMatch>(parsedSpec.getFilter(), pExpCtx);
+ return make_intrusive<DocumentSourceChangeStreamOplogMatch>(parsedSpec.getFilter(), pExpCtx);
}
-const char* DocumentSourceOplogMatch::getSourceName() const {
+const char* DocumentSourceChangeStreamOplogMatch::getSourceName() const {
// This is used in error reporting, particularly if we find this stage in a position other
// than first, so report the name as $changeStream.
return DocumentSourceChangeStream::kStageName.rawData();
}
-StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipeState) const {
+StageConstraints DocumentSourceChangeStreamOplogMatch::constraints(
+ Pipeline::SplitState pipeState) const {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
@@ -235,7 +237,7 @@ StageConstraints DocumentSourceOplogMatch::constraints(Pipeline::SplitState pipe
return constraints;
}
-Value DocumentSourceOplogMatch::serializeLatest(
+Value DocumentSourceChangeStreamOplogMatch::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(
@@ -244,7 +246,7 @@ Value DocumentSourceOplogMatch::serializeLatest(
}
DocumentSourceChangeStreamOplogMatchSpec spec(_predicate);
- return Value(Document{{DocumentSourceOplogMatch::kStageName, spec.toBSON()}});
+ return Value(Document{{DocumentSourceChangeStreamOplogMatch::kStageName, spec.toBSON()}});
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
index 55641a8429b..06d6602ca49 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
@@ -36,18 +36,20 @@ namespace mongo {
* A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied
* on the oplog. The stage requires itself to be the first stage in the pipeline.
*/
-class DocumentSourceOplogMatch final : public DocumentSourceMatch,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch,
+ public ChangeStreamStageSerializationInterface {
public:
static constexpr StringData kStageName = "$_internalChangeStreamOplogMatch"_sd;
- DocumentSourceOplogMatch(BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ DocumentSourceChangeStreamOplogMatch(BSONObj filter,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(std::move(filter), expCtx) {
// A change stream pipeline should always create a tailable + awaitData cursor.
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
}
- DocumentSourceOplogMatch(const DocumentSourceOplogMatch& other) : DocumentSourceMatch(other) {}
+ DocumentSourceChangeStreamOplogMatch(const DocumentSourceChangeStreamOplogMatch& other)
+ : DocumentSourceMatch(other) {}
virtual boost::intrusive_ptr<DocumentSourceMatch> clone() const {
return make_intrusive<std::decay_t<decltype(*this)>>(*this);
@@ -55,7 +57,7 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- static boost::intrusive_ptr<DocumentSourceOplogMatch> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, bool showMigrationEvents);
const char* getSourceName() const final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index aa91f342bb5..df6fbb1e5d6 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -44,15 +44,15 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h"
-#include "mongo/db/pipeline/document_source_check_invalidate.h"
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
-#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -252,12 +252,12 @@ public:
getExpCtx()->mongoProcessInterface =
std::make_unique<MockMongoInterface>(std::vector<FieldPath>{});
- // This match stage is a DocumentSourceOplogMatch, which we explicitly disallow from
- // executing as a safety mechanism, since it needs to use the collection-default collation,
- // even if the rest of the pipeline is using some other collation. To avoid ever executing
- // that stage here, we'll up-convert it from the non-executable DocumentSourceOplogMatch to
- // a fully-executable DocumentSourceMatch. This is safe because all of the unit tests will
- // use the 'simple' collation.
+ // This match stage is a DocumentSourceChangeStreamOplogMatch, which we explicitly disallow
+ // from executing as a safety mechanism, since it needs to use the collection-default
+ // collation, even if the rest of the pipeline is using some other collation. To avoid ever
+ // executing that stage here, we'll up-convert it from the non-executable
+ // DocumentSourceChangeStreamOplogMatch to a fully-executable DocumentSourceMatch. This is
+ // safe because all of the unit tests will use the 'simple' collation.
auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get());
ASSERT(match);
auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx());
@@ -275,7 +275,7 @@ public:
// Remove the DSEnsureResumeTokenPresent stage since it will swallow the result.
auto newEnd = std::remove_if(stages.begin(), stages.end(), [](auto& stage) {
- return dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage.get());
+ return dynamic_cast<DocumentSourceChangeStreamEnsureResumeTokenPresent*>(stage.get());
});
stages.erase(newEnd, stages.end());
@@ -2176,7 +2176,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSOplogMatchStageSeriali
spec.setFilter(dummyFilter);
auto stageSpecAsBSON = BSON("" << spec.toBSON());
- validateDocumentSourceStageSerialization<DocumentSourceOplogMatch>(
+ validateDocumentSourceStageSerialization<DocumentSourceChangeStreamOplogMatch>(
std::move(spec), stageSpecAsBSON, expCtx);
}
@@ -2199,7 +2199,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSCheckInvalidateStageSe
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)));
auto stageSpecAsBSON = BSON("" << spec.toBSON());
- validateDocumentSourceStageSerialization<DocumentSourceCheckInvalidate>(
+ validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckInvalidate>(
std::move(spec), stageSpecAsBSON, expCtx);
}
@@ -2210,7 +2210,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSResumabilityStageSeria
spec.setResumeToken(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid())));
auto stageSpecAsBSON = BSON("" << spec.toBSON());
- validateDocumentSourceStageSerialization<DocumentSourceCheckResumability>(
+ validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckResumability>(
std::move(spec), stageSpecAsBSON, expCtx);
}
@@ -2220,7 +2220,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePreImageSt
DocumentSourceChangeStreamLookUpPreImageSpec spec(FullDocumentBeforeChangeModeEnum::kRequired);
auto stageSpecAsBSON = BSON("" << spec.toBSON());
- validateDocumentSourceStageSerialization<DocumentSourceLookupChangePreImage>(
+ validateDocumentSourceStageSerialization<DocumentSourceChangeStreamLookupPreImage>(
std::move(spec), stageSpecAsBSON, expCtx);
}
@@ -2230,7 +2230,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePostImageS
DocumentSourceChangeStreamLookUpPostImageSpec spec(FullDocumentModeEnum::kUpdateLookup);
auto stageSpecAsBSON = BSON("" << spec.toBSON());
- validateDocumentSourceStageSerialization<DocumentSourceLookupChangePostImage>(
+ validateDocumentSourceStageSerialization<DocumentSourceChangeStreamLookupPostImage>(
std::move(spec), stageSpecAsBSON, expCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 4d5610878f0..0f62b87ec4f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -36,9 +36,9 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
#include "mongo/db/pipeline/document_path_support.h"
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp
index 35dff608271..48a3e7db187 100644
--- a/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,7 +27,7 @@
* it in the license file.
*/
-#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
+#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
#include <algorithm>
@@ -77,16 +77,17 @@ bool isShardConfigEvent(const Document& eventDoc) {
}
} // namespace
-boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create(
+boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard>
+DocumentSourceChangeStreamUpdateOnAddShard::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceUpdateOnAddShard(expCtx);
+ return new DocumentSourceChangeStreamUpdateOnAddShard(expCtx);
}
-DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
+DocumentSourceChangeStreamUpdateOnAddShard::DocumentSourceChangeStreamUpdateOnAddShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
-DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamUpdateOnAddShard::doGetNext() {
// For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be
// populated. We also resolve the original aggregation command from the expression context.
if (!_mergeCursors) {
@@ -112,11 +113,13 @@ DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
return childResult;
}
-void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShardDetectedObj) {
+void DocumentSourceChangeStreamUpdateOnAddShard::addNewShardCursors(
+ const Document& newShardDetectedObj) {
_mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
}
-std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards(
+std::vector<RemoteCursor>
+DocumentSourceChangeStreamUpdateOnAddShard::establishShardCursorsOnNewShards(
const Document& newShardDetectedObj) {
// Reload the shard registry to see the new shard.
auto* opCtx = pExpCtx->opCtx;
@@ -143,7 +146,8 @@ std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsO
allowPartialResults);
}
-BSONObj DocumentSourceUpdateOnAddShard::createUpdatedCommandForNewShard(Timestamp shardAddedTime) {
+BSONObj DocumentSourceChangeStreamUpdateOnAddShard::createUpdatedCommandForNewShard(
+ Timestamp shardAddedTime) {
// We must start the new cursor from the moment at which the shard became visible.
const auto newShardAddedTime = LogicalTime{shardAddedTime};
auto resumeTokenForNewShard =
@@ -174,7 +178,8 @@ BSONObj DocumentSourceUpdateOnAddShard::createUpdatedCommandForNewShard(Timestam
true /* needsMerge */);
}
-BSONObj DocumentSourceUpdateOnAddShard::replaceResumeTokenInCommand(Document resumeToken) {
+BSONObj DocumentSourceChangeStreamUpdateOnAddShard::replaceResumeTokenInCommand(
+ Document resumeToken) {
Document originalCmd(_originalAggregateCommand);
auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray();
diff --git a/src/mongo/db/pipeline/document_source_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h
index 82e981572c1..4270ff8b1ec 100644
--- a/src/mongo/db/pipeline/document_source_update_on_add_shard.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -45,7 +45,7 @@ namespace mongo {
* the first time. When this event is detected, this stage will establish a new cursor on that
* shard and add it to the cursors being merged.
*/
-class DocumentSourceUpdateOnAddShard final : public DocumentSource {
+class DocumentSourceChangeStreamUpdateOnAddShard final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalUpdateOnAddShard"_sd;
@@ -53,7 +53,7 @@ public:
* Creates a new stage which will establish a new cursor and add it to the cursors being merged
* by 'mergeCursorsStage' whenever a new shard is detected by a change stream.
*/
- static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard> create(
const boost::intrusive_ptr<ExpressionContext>&);
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
@@ -77,7 +77,7 @@ public:
}
private:
- DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&);
+ DocumentSourceChangeStreamUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&);
GetNextResult doGetNext() final;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index e8fde0f3d7c..f81f1880616 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -40,8 +40,8 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
-#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h"
@@ -312,10 +312,10 @@ protected:
/**
* Convenience method to create the class under test with a given ResumeTokenData.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
- ResumeTokenData tokenData) {
+ intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
+ createDSEnsureResumeTokenPresent(ResumeTokenData tokenData) {
auto checkResumeToken =
- DocumentSourceEnsureResumeTokenPresent::create(getExpCtx(), tokenData);
+ DocumentSourceChangeStreamEnsureResumeTokenPresent::create(getExpCtx(), tokenData);
_mock->setResumeToken(std::move(tokenData));
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
@@ -325,12 +325,12 @@ protected:
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
- Timestamp ts,
- int version,
- std::size_t txnOpIndex,
- boost::optional<Document> docKey,
- UUID uuid) {
+ intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
+ createDSEnsureResumeTokenPresent(Timestamp ts,
+ int version,
+ std::size_t txnOpIndex,
+ boost::optional<Document> docKey,
+ UUID uuid) {
return createDSEnsureResumeTokenPresent(
{ts, version, txnOpIndex, uuid, docKey ? Value(*docKey) : Value()});
}
@@ -339,8 +339,10 @@ protected:
* Convenience method to create the class under test with a given timestamp, docKey, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
- Timestamp ts, boost::optional<Document> docKey, UUID uuid = testUuid()) {
+ intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
+ createDSEnsureResumeTokenPresent(Timestamp ts,
+ boost::optional<Document> docKey,
+ UUID uuid = testUuid()) {
return createDSEnsureResumeTokenPresent(ts, 0, 0, docKey, uuid);
}
@@ -348,8 +350,8 @@ protected:
* Convenience method to create the class under test with a given timestamp, _id string, and
* namespace.
*/
- intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(
- Timestamp ts, StringData id, UUID uuid = testUuid()) {
+ intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
+ createDSEnsureResumeTokenPresent(Timestamp ts, StringData id, UUID uuid = testUuid()) {
return createDSEnsureResumeTokenPresent(ts, 0, 0, Document{{"_id", id}}, uuid);
}
@@ -367,14 +369,16 @@ protected:
class CheckResumabilityTest : public CheckResumeTokenTest {
protected:
- intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(
+ intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability(
ResumeTokenData tokenData) {
- auto dsCheckResumability = DocumentSourceCheckResumability::create(getExpCtx(), tokenData);
+ auto dsCheckResumability =
+ DocumentSourceChangeStreamCheckResumability::create(getExpCtx(), tokenData);
_mock->setResumeToken(std::move(tokenData));
dsCheckResumability->setSource(_mock.get());
return dsCheckResumability;
}
- intrusive_ptr<DocumentSourceCheckResumability> createDSCheckResumability(Timestamp ts) {
+ intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability(
+ Timestamp ts) {
return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
}
};
@@ -493,8 +497,8 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) {
// Verify that a resume token whose documentKey only contains _id can be used to resume a stream
// on a sharded collection as long as its _id matches the first document. We set 'inMongos'
- // since this behaviour is only applicable when DocumentSourceEnsureResumeTokenPresent is
- // running on mongoS.
+ // since this behaviour is only applicable when
+ // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS.
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
@@ -533,8 +537,8 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes
TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumentKeyFields) {
// Verify that the relaxed _id check only applies if _id is the sole field present in the
// client's resume token, even if all the fields that are present match the first doc. We set
- // 'inMongos' since this is only applicable when DocumentSourceEnsureResumeTokenPresent is
- // running on mongoS.
+ // 'inMongos' since this is only applicable when
+ // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS.
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
@@ -976,7 +980,8 @@ TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event
+ // ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
auto dsCheckResumability = createDSCheckResumability(token);
@@ -1005,7 +1010,8 @@ TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeT
TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceCheckResumability to check for an exact event ResumeToken.
+ // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event
+ // ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
auto dsCheckResumability = createDSCheckResumability(token);
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 2f6e05a0fc8..d7aa35e1c66 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -40,12 +40,12 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_facet.h"
#include "mongo/db/pipeline/document_source_graph_lookup.h"
#include "mongo/db/pipeline/document_source_internal_split_pipeline.h"
#include "mongo/db/pipeline/document_source_lookup.h"
-#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
-#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_source_out.h"
@@ -2061,7 +2061,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the change lookup is at the end.
- ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPostImage*>(stages.back().get()));
auto matchPredicate = BSON("extra"
<< "predicate");
@@ -2070,7 +2070,8 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
pipeline->optimizePipeline();
// Make sure the $match stage has swapped before the change look up.
- ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(pipeline->getSources().back().get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPostImage*>(
+ pipeline->getSources().back().get()));
}
TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage) {
@@ -2087,10 +2088,11 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the change lookup is at the end.
- ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPostImage*>(stages.back().get()));
stages.push_back(DocumentSourceMatch::create(
- BSON(DocumentSourceLookupChangePostImage::kFullDocumentFieldName << BSONNULL), expCtx));
+ BSON(DocumentSourceChangeStreamLookupPostImage::kFullDocumentFieldName << BSONNULL),
+ expCtx));
auto pipeline = Pipeline::create(stages, expCtx);
pipeline->optimizePipeline();
@@ -2112,7 +2114,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the pre-image lookup is at the end.
- ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPreImage*>(stages.back().get()));
auto matchPredicate = BSON("extra"
<< "predicate");
@@ -2121,7 +2123,8 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen
pipeline->optimizePipeline();
// Make sure the $match stage has swapped before the change look up.
- ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(pipeline->getSources().back().get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPreImage*>(
+ pipeline->getSources().back().get()));
}
TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPreImage) {
@@ -2138,10 +2141,11 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
ASSERT_EQ(stages.size(), getChangeStreamStageSize());
// Make sure the pre-image lookup is at the end.
- ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get()));
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamLookupPreImage*>(stages.back().get()));
stages.push_back(DocumentSourceMatch::create(
- BSON(DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName << BSONNULL),
+ BSON(DocumentSourceChangeStreamLookupPreImage::kFullDocumentBeforeChangeFieldName
+ << BSONNULL),
expCtx));
auto pipeline = Pipeline::create(stages, expCtx);
pipeline->optimizePipeline();
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 7d405768fc2..8f495bcb756 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
@@ -47,7 +48,6 @@
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
-#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/semantic_analysis.h"
#include "mongo/db/vector_clock.h"