summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-10-11 14:23:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-15 22:08:36 +0000
commit37104ac0e21fdffd29de16be74f4a35d4e1c865c (patch)
tree8fab578349c6581c4ad3ff6e6d8f1c82235dc96c /src/mongo/db/pipeline
parentdad32a29132b9427a9d742f4f4d2ecf3bc3d830f (diff)
downloadmongo-37104ac0e21fdffd29de16be74f4a35d4e1c865c.tar.gz
SERVER-55659 Remove feature flag for Allow $changeStream to participate
in optimization.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp121
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h20
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.h19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.h11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp80
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h95
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.h10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp202
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp4
28 files changed, 147 insertions, 597 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index ad8d1b310dc..7bf050b06ab 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -382,7 +382,6 @@ env.Library(
'document_source_change_stream_check_invalidate.cpp',
'document_source_change_stream_check_resumability.cpp',
'document_source_change_stream_check_topology_change.cpp',
- 'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_ensure_resume_token_present.cpp',
'document_source_change_stream_handle_topology_change.cpp',
'document_source_change_stream_add_pre_image.cpp',
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 75cd1ec29d8..861f86f0992 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -36,7 +36,6 @@
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
-#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
@@ -49,6 +48,12 @@ namespace change_stream_legacy {
std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) {
+ // The only case where we expect to build a legacy pipeline is if we are a shard which has
+ // received a $changeStream request from an older mongoS.
+ tassert(5565900,
+ "Unexpected {needsMerge:false} request for a legacy change stream pipeline",
+ expCtx->needsMerge);
+
std::list<boost::intrusive_ptr<DocumentSource>> stages;
const auto userRequestedResumePoint =
@@ -65,6 +70,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec));
stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, spec));
+
tassert(5467606,
"'DocumentSourceChangeStreamTransform' stage should populate "
"'initialPostBatchResumeToken' field",
@@ -74,50 +80,8 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
// whether the event that matches the resume token should be followed by an "invalidate" event.
stages.push_back(DocumentSourceChangeStreamCheckInvalidate::create(expCtx, spec));
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
-
- // If the user-requested resume point is a high water mark, or if we are running on the shards
- // in a cluster, we must include a DSCSCheckResumability stage.
- if (expCtx->needsMerge ||
- (userRequestedResumePoint && ResumeToken::isHighWaterMarkToken(resumeToken))) {
- stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec));
- }
-
- // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as
- // the split point for the pipline. All stages before this stages will run on shards and all
- // stages after and inclusive of this stage will run on the MongoS.
- if (expCtx->inMongos) {
- stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx));
- }
-
- // If the resume token is from an event, we must include a DSCSEnsureResumeTokenPresent stage.
- // In a cluster, this will be on mongoS and should not be generated on the shards.
- if (!expCtx->needsMerge && !ResumeToken::isHighWaterMarkToken(resumeToken)) {
- stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec));
- }
-
- if (!expCtx->needsMerge) {
- // There should only be one close cursor stage. If we're on the shards and producing input
- // to be merged, do not add a close cursor stage, since the mongos will already have one.
- stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx));
-
- // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
- // so that any $match stages which follow the $changeStream pipeline prefix may be able to
- // skip ahead of the DSCSAddPreImage stage. This allows a whole-db or whole-cluster stream
- // to run on an instance where only some collections have pre-images enabled, so long as the
- // user filters for only those namespaces.
- // TODO SERVER-36941: figure out how to get this to work in a sharded cluster.
- if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) {
- invariant(!expCtx->inMongos);
- stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec));
- }
-
- // There should be only one post-image lookup stage. If we're on the shards and producing
- // input to be merged, the lookup is done on the mongos.
- if (spec.getFullDocument() != FullDocumentModeEnum::kDefault) {
- stages.push_back(DocumentSourceChangeStreamAddPostImage::create(expCtx, spec));
- }
- }
+ // We must always check that the shard is capable of resuming from the specified point.
+ stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec));
return stages;
}
@@ -157,71 +121,6 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
return Document{opLogEntry.getObject().getOwned()};
}
-} // namespace change_stream_legacy
-
-Value DocumentSourceChangeStreamOplogMatch::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- return explain ? Value(Document{{"$_internalOplogMatch"_sd, Document{}}}) : Value();
-}
-
-Value DocumentSourceChangeStreamTransform::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- tassert(
- 5467607,
- str::stream() << "At least one of 'resumeAfter', 'startAfter' or 'startAtOperationTime' "
- "fields should be present to serialize "
- << DocumentSourceChangeStreamTransform::kStageName << " stage",
- _changeStreamSpec.getResumeAfter() || _changeStreamSpec.getStartAtOperationTime() ||
- _changeStreamSpec.getStartAfter());
-
- return Value(Document{{DocumentSourceChangeStream::kStageName, _changeStreamSpec.toBSON()}});
-}
-
-Value DocumentSourceChangeStreamCheckInvalidate::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // We only serialize this stage in the context of explain.
- return explain ? Value(DOC(kStageName << Document())) : Value();
-}
-
-Value DocumentSourceChangeStreamCheckResumability::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- // We only serialize this stage in the context of explain.
- return explain ? Value(DOC(getSourceName()
- << DOC("resumeToken" << ResumeToken(_tokenFromClient).toDocument())))
- : Value();
-}
-
-Value DocumentSourceChangeStreamUnwindTransaction::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- if (explain) {
- return Value(
- Document{{kStageName,
- Value(Document{
- {"nsRegex",
- DocumentSourceChangeStream::getNsRegexForChangeStream(pExpCtx->ns)}})}});
- }
-
- return Value();
-}
-
-Value DocumentSourceChangeStreamAddPreImage::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- return (explain ? Value{Document{{kStageName, Document()}}} : Value());
-}
-
-Value DocumentSourceChangeStreamAddPostImage::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- return (explain ? Value{Document{{kStageName, Document()}}} : Value());
-}
-
-Value DocumentSourceChangeStreamCheckTopologyChange::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- return (explain ? Value{Document{{kStageName, Document()}}} : Value());
-}
-
-Value DocumentSourceChangeStreamHandleTopologyChange::serializeLegacy(
- boost::optional<ExplainOptions::Verbosity> explain) const {
- return (explain ? Value(Document{{kStageName, Value()}}) : Value());
-}
+} // namespace change_stream_legacy
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 2e6fd767e62..11b551ac755 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
-#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
@@ -233,8 +232,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// If we see this stage on a shard, it means that the raw $changeStream stage was dispatched to
// us from an old mongoS. Build a legacy shard pipeline.
- if (expCtx->needsMerge ||
- !feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ if (expCtx->needsMerge) {
return change_stream_legacy::buildPipeline(expCtx, spec);
}
return _buildPipeline(expCtx, spec);
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index b58c31108d6..4cc02667ab2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -258,24 +258,4 @@ public:
}
};
-/**
- * Class interface to keep track of the change streams internal stage serialization formats across
- * versions or features.
- *
- * TODO SERVER-55659: remove this serializer class and make each stage serialize only the "latest"
- * format.
- */
-class ChangeStreamStageSerializationInterface {
-public:
- Value serializeToValue(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const {
- return feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()
- ? serializeLatest(explain)
- : serializeLegacy(explain);
- }
-
-protected:
- virtual Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const = 0;
- virtual Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const = 0;
-};
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
index fa0cf402095..051922556a5 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
@@ -43,11 +43,10 @@ constexpr StringData DocumentSourceChangeStreamAddPostImage::kStageName;
constexpr StringData DocumentSourceChangeStreamAddPostImage::kFullDocumentFieldName;
namespace {
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamAddPostImage,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamAddPostImage::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPostImage,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamAddPostImage::createFromBson,
+ true);
Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType expectedType) {
@@ -217,7 +216,7 @@ boost::optional<Document> DocumentSourceChangeStreamAddPostImage::lookupLatestPo
pExpCtx, nss, *resumeTokenData.uuid, documentKey, std::move(readConcern));
}
-Value DocumentSourceChangeStreamAddPostImage::serializeLatest(
+Value DocumentSourceChangeStreamAddPostImage::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
? Value(Document{
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
index 2732639eb9d..d92994064e2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
@@ -38,9 +38,7 @@ namespace mongo {
* Part of the change stream API machinery used to look up the post-image of a document. Uses the
* "documentKey" field of the input to look up the new version of the document.
*/
-class DocumentSourceChangeStreamAddPostImage final
- : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamAddPostImage final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamAddPostImage"_sd;
static constexpr StringData kFullDocumentFieldName =
@@ -77,15 +75,9 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
invariant(pipeState != Pipeline::SplitState::kSplitForShards);
- // TODO SERVER-55659: remove the feature flag.
- HostTypeRequirement hostTypeRequirement =
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()
- ? HostTypeRequirement::kAnyShard
- : HostTypeRequirement::kLocalOnly;
-
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- hostTypeRequirement,
+ HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
@@ -122,9 +114,7 @@ public:
return DepsTracker::State::SEE_NEXT;
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
const char* getSourceName() const final {
return kStageName.rawData();
@@ -158,9 +148,6 @@ private:
*/
NamespaceString assertValidNamespace(const Document& inputDoc) const;
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
// Determines whether post-images are strictly required or may be included only when available,
// and whether to return a point-in-time post-image or the most current majority-committed
// version of the updated document.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
index 329c497a5dd..25ca17bbbfa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp
@@ -106,8 +106,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForE
Document{{"stage"_sd, DocumentSourceChangeStreamAddPostImage::kStageName},
{"fullDocument"_sd, "updateLookup"_sd}}}});
- ASSERT_VALUE_EQ(stage->serializeToValue({ExplainOptions::Verbosity::kQueryPlanner}),
- expectedOutput);
+ ASSERT_VALUE_EQ(stage->serialize({ExplainOptions::Verbosity::kQueryPlanner}), expectedOutput);
}
TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForDispatch) {
@@ -117,7 +116,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldSerializeAsExpectedForD
const auto expectedOutput = Value(Document{{DocumentSourceChangeStreamAddPostImage::kStageName,
Document{{"fullDocument"_sd, "updateLookup"_sd}}}});
- ASSERT_VALUE_EQ(stage->serializeToValue(), expectedOutput);
+ ASSERT_VALUE_EQ(stage->serialize(), expectedOutput);
}
TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
index 40ac5ef1dab..dad3a18b306 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
@@ -43,11 +43,10 @@
namespace mongo {
namespace {
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamAddPreImage,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamAddPreImage::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamAddPreImage,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamAddPreImage::createFromBson,
+ true);
} // namespace
constexpr StringData DocumentSourceChangeStreamAddPreImage::kStageName;
@@ -165,7 +164,7 @@ boost::optional<Document> DocumentSourceChangeStreamAddPreImage::lookupPreImage(
return lookedUpDoc->getField(ChangeStreamPreImage::kPreImageFieldName).getDocument().getOwned();
}
-Value DocumentSourceChangeStreamAddPreImage::serializeLatest(
+Value DocumentSourceChangeStreamAddPreImage::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
? Value(Document{
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h
index 7c0153d3519..8e9c764fe19 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h
@@ -41,8 +41,7 @@ namespace mongo {
* its "fullDocumentBeforeChange" field shall be the optime of the noop oplog entry containing the
* pre-image. This stage replaces that field with the actual pre-image document.
*/
-class DocumentSourceChangeStreamAddPreImage final : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamAddPreImage final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamAddPreImage"_sd;
static constexpr StringData kFullDocumentBeforeChangeFieldName =
@@ -109,9 +108,7 @@ public:
return DepsTracker::State::SEE_NEXT;
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
const char* getSourceName() const final {
return kStageName.rawData();
@@ -123,9 +120,6 @@ private:
*/
GetNextResult doGetNext() final;
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
// Determines whether pre-images are strictly required or may be included only when available.
FullDocumentBeforeChangeModeEnum _fullDocumentBeforeChangeMode =
FullDocumentBeforeChangeModeEnum::kOff;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
index e386cc73bb2..84b1f023d1e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
@@ -41,11 +41,10 @@ namespace mongo {
using DSCS = DocumentSourceChangeStream;
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamCheckInvalidate,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamCheckInvalidate::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckInvalidate,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamCheckInvalidate::createFromBson,
+ true);
namespace {
@@ -103,8 +102,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNe
return res;
}
- if (_queuedException &&
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ if (_queuedException) {
uasserted(static_cast<ChangeStreamInvalidationInfo>(*_queuedException),
"Change stream invalidated");
}
@@ -179,7 +177,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNe
return nextInput;
}
-Value DocumentSourceChangeStreamCheckInvalidate::serializeLatest(
+Value DocumentSourceChangeStreamCheckInvalidate::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(Document{{DocumentSourceChangeStream::kStageName,
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
index 30dc1b6eeda..166604c1d10 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
@@ -39,9 +39,7 @@ namespace mongo {
* "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop
* for a single-collection change stream). It is not intended to be created by the user.
*/
-class DocumentSourceChangeStreamCheckInvalidate final
- : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamCheckInvalidate final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckInvalidate"_sd;
@@ -66,9 +64,7 @@ public:
return boost::none;
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -91,9 +87,6 @@ private:
GetNextResult doGetNext() final;
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
boost::optional<ResumeTokenData> _startAfterInvalidate;
boost::optional<Document> _queuedInvalidate;
boost::optional<ChangeStreamInvalidationInfo> _queuedException;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index 4c5ef221616..b68edc81899 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -38,11 +38,10 @@ using boost::intrusive_ptr;
namespace mongo {
namespace {
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamCheckResumability,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamCheckResumability::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamCheckResumability::createFromBson,
+ true);
} // namespace
@@ -246,7 +245,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGet
MONGO_UNREACHABLE;
}
-Value DocumentSourceChangeStreamCheckResumability::serializeLatest(
+Value DocumentSourceChangeStreamCheckResumability::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
return explain
? Value(DOC(DocumentSourceChangeStream::kStageName
@@ -258,4 +257,5 @@ Value DocumentSourceChangeStreamCheckResumability::serializeLatest(
DocumentSourceChangeStreamCheckResumabilitySpec(ResumeToken(_tokenFromClient))
.toBSON()}});
}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
index 5b2caea6b26..a290d59cacd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
@@ -59,8 +59,7 @@ namespace mongo {
* - Otherwise we cannot resume, as we do not know if there were any events between the resume token
* and the first matching document in the oplog.
*/
-class DocumentSourceChangeStreamCheckResumability : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamCheckResumability : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckResumability"_sd;
@@ -90,9 +89,7 @@ public:
return boost::none;
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const override;
static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -117,9 +114,5 @@ protected:
ResumeStatus _resumeStatus = ResumeStatus::kCheckNextDoc;
const ResumeTokenData _tokenFromClient;
-
-private:
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
index d3d62ac864a..26cdcb77181 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
@@ -35,11 +35,10 @@
namespace mongo {
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamCheckTopologyChange,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamCheckTopologyChange::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckTopologyChange,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamCheckTopologyChange::createFromBson,
+ true);
StageConstraints DocumentSourceChangeStreamCheckTopologyChange::constraints(
Pipeline::SplitState pipeState) const {
@@ -87,7 +86,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doG
return nextInput;
}
-Value DocumentSourceChangeStreamCheckTopologyChange::serializeLatest(
+Value DocumentSourceChangeStreamCheckTopologyChange::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(DOC(DocumentSourceChangeStream::kStageName
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h
index 1808cbe7995..7629e5d8063 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h
@@ -45,9 +45,7 @@ namespace mongo {
* that previously may not have held any data for the collection being watched, and they contain the
* information necessary for the mongoS to include the new shard in the merged change stream.
*/
-class DocumentSourceChangeStreamCheckTopologyChange final
- : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamCheckTopologyChange final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckTopologyChange"_sd;
@@ -69,9 +67,7 @@ public:
return boost::none;
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
private:
DocumentSourceChangeStreamCheckTopologyChange(
@@ -79,10 +75,6 @@ private:
: DocumentSource(kStageName, expCtx) {}
GetNextResult doGetNext() final;
-
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
deleted file mode 100644
index a7db8e647a0..00000000000
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
-
-namespace mongo {
-
-namespace {
-
-// Returns true if the given 'operationType' should invalidate the change stream based on the
-// namespace in 'pExpCtx'.
-bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
- StringData operationType) {
- if (pExpCtx->isSingleNamespaceAggregation()) {
- return operationType == DocumentSourceChangeStream::kDropCollectionOpType ||
- operationType == DocumentSourceChangeStream::kRenameCollectionOpType ||
- operationType == DocumentSourceChangeStream::kDropDatabaseOpType;
- } else if (!pExpCtx->isClusterAggregation()) {
- return operationType == DocumentSourceChangeStream::kDropDatabaseOpType;
- } else {
- return false;
- }
-};
-
-} // namespace
-
-DocumentSource::GetNextResult DocumentSourceChangeStreamCloseCursor::doGetNext() {
- // Close cursor if we have returned an invalidate entry.
- if (_shouldCloseCursor) {
- uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
- }
-
- auto nextInput = pSource->getNext();
- if (!nextInput.isAdvanced())
- return nextInput;
-
- auto doc = nextInput.getDocument();
- const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField;
- DocumentSourceChangeStream::checkValueType(
- doc[kOperationTypeField], kOperationTypeField, BSONType::String);
- auto operationType = doc[kOperationTypeField].getString();
- if (operationType == DocumentSourceChangeStream::kInvalidateOpType) {
- // Pass the invalidation forward, so that it can be included in the results, or
- // filtered/transformed by further stages in the pipeline, then throw an exception
- // to close the cursor on the next call to getNext().
- _shouldCloseCursor = true;
- }
-
- return nextInput;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
deleted file mode 100644
index 16a1e13f2d4..00000000000
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/change_stream_constants.h"
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_sort.h"
-
-namespace mongo {
-
-/**
- * This stage is used internally for change notifications to close cursor after returning
- * "invalidate" entries.
- * It is not intended to be created by the user.
- */
-class DocumentSourceChangeStreamCloseCursor final : public DocumentSource {
-public:
- static constexpr StringData kStageName = "$changeStream"_sd;
-
- const char* getSourceName() const final {
- // This is used in error reporting.
- return DocumentSourceChangeStreamCloseCursor::kStageName.rawData();
- }
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- // This stage should never be in the shards part of a split pipeline.
- invariant(pipeState != Pipeline::SplitState::kSplitForShards);
- return {StreamType::kStreaming,
- PositionRequirement::kNone,
- // If this is parsed on mongos it should stay on mongos. If we're not in a sharded
- // cluster then it's okay to run on mongod.
- HostTypeRequirement::kLocalOnly,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kNotAllowed,
- LookupRequirement::kNotAllowed,
- UnionRequirement::kNotAllowed,
- ChangeStreamRequirement::kChangeStreamStage};
- }
-
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- // We only serialize this stage in the context of explain.
- return explain ? Value(DOC(kStageName << Document())) : Value();
- }
-
- static boost::intrusive_ptr<DocumentSourceChangeStreamCloseCursor> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceChangeStreamCloseCursor(expCtx);
- }
-
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- return boost::none;
- }
-
-private:
- /**
- * Use the create static method to create a DocumentSourceChangeStreamCloseCursor.
- */
- DocumentSourceChangeStreamCloseCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(kStageName, expCtx) {}
-
- GetNextResult doGetNext() final;
-
- bool _shouldCloseCursor = false;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
index 4850f0e7290..d100b6ace48 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
@@ -76,10 +76,8 @@ StageConstraints DocumentSourceChangeStreamEnsureResumeTokenPresent::constraints
// pipelines, swaps can allow $match and 'DocumentSourceSingleDocumentTransformation' stages to
// execute on the shards, providing inter-node parallelism and potentially reducing the amount
// of data sent form each shard to the mongoS.
- if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
- constraints.canSwapWithMatch = true;
- constraints.canSwapWithSingleDocTransform = true;
- }
+ constraints.canSwapWithMatch = true;
+ constraints.canSwapWithSingleDocTransform = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
index 394d4bd0d13..8b3463f015b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
@@ -105,10 +105,8 @@ StageConstraints DocumentSourceChangeStreamHandleTopologyChange::constraints(
// Can be swapped with the '$match' and 'DocumentSourceSingleDocumentTransformation' stages and
// ensures that they get pushed down to the shards, as this stage bisects the change streams
// pipeline.
- if (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
- constraints.canSwapWithMatch = true;
- constraints.canSwapWithSingleDocTransform = true;
- }
+ constraints.canSwapWithMatch = true;
+ constraints.canSwapWithSingleDocTransform = true;
return constraints;
}
@@ -229,7 +227,7 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::replaceResumeTokenInComm
return newCmd.freeze().toBson();
}
-Value DocumentSourceChangeStreamHandleTopologyChange::serializeLatest(
+Value DocumentSourceChangeStreamHandleTopologyChange::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(DOC(DocumentSourceChangeStream::kStageName
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
index b50ae1000b6..c5d5a16fd93 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
@@ -46,9 +46,7 @@ namespace mongo {
* the first time. When this event is detected, this stage will establish a new cursor on that
* shard and add it to the cursors being merged.
*/
-class DocumentSourceChangeStreamHandleTopologyChange final
- : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamHandleTopologyChange final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamHandleTopologyChange"_sd;
@@ -63,9 +61,7 @@ public:
return kStageName.rawData();
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
StageConstraints constraints(Pipeline::SplitState) const final;
@@ -83,10 +79,6 @@ private:
GetNextResult doGetNext() final;
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
/**
* Establish the new cursors and tell the RouterStageMerge about them.
*/
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
index 8ac7ea35a09..6a634cc38a2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
@@ -36,11 +36,10 @@
namespace mongo {
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamOplogMatch,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamOplogMatch::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamOplogMatch,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamOplogMatch::createFromBson,
+ true);
namespace change_stream_filter {
/**
@@ -191,7 +190,7 @@ Pipeline::SourceContainer::iterator DocumentSourceChangeStreamOplogMatch::doOpti
return nextChangeStreamStageItr;
}
-Value DocumentSourceChangeStreamOplogMatch::serializeLatest(
+Value DocumentSourceChangeStreamOplogMatch::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
index 6207d46e376..5622e483569 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
@@ -36,8 +36,7 @@ namespace mongo {
* A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied
* on the oplog. The stage requires itself to be the first stage in the pipeline.
*/
-class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch {
public:
static constexpr StringData kStageName = "$_internalChangeStreamOplogMatch"_sd;
@@ -81,9 +80,7 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
protected:
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
@@ -102,9 +99,6 @@ private:
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
}
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
// Needed for re-creating the filter during optimization. Note that we do not serialize these
// fields. The filter in a serialized DocumentSourceOplogMatch is considered final, so there is
// no need to re-create it.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index d1e76617542..f18b2622a5f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -236,12 +236,7 @@ public:
ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedInvalidate);
// Then throw an exception on the next call of getNext().
- if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
- ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
- } else {
- ASSERT_THROWS(lastStage->getNext(),
- ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
- }
+ ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
}
}
@@ -440,58 +435,10 @@ public:
};
checkTransformation(deltaOplog, expectedUpdateField);
}
-};
-
-bool getCSOptimizationFeatureFlagValue() {
- return feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV();
-}
-
-bool getCSRewriteFeatureFlagValue() {
- return feature_flags::gFeatureFlagChangeStreamsRewrite.isEnabledAndIgnoreFCV();
-}
-
-bool isChangeStreamPreAndPostImagesEnabled() {
- return feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabledAndIgnoreFCV();
-}
-
-/**
- * Runs the tests with feature flag 'featureFlagChangeStreamsOptimization' true and false.
- */
-class ChangeStreamStageWithDualFeatureFlagValueTest : public ChangeStreamStageTest {
-public:
- ChangeStreamStageWithDualFeatureFlagValueTest() : ChangeStreamStageTest() {}
-
-
- void run() {
- {
- RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsOptimization",
- true);
- ASSERT(getCSOptimizationFeatureFlagValue());
- ChangeStreamStageTest::run();
- }
- {
- RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsOptimization",
- false);
- ASSERT_FALSE(getCSOptimizationFeatureFlagValue());
- ChangeStreamStageTest::run();
- }
- }
-};
-
-class ChangeStreamPipelineOptimizationTest : public ChangeStreamStageTest {
-public:
- explicit ChangeStreamPipelineOptimizationTest() : ChangeStreamStageTest() {}
-
- void run() {
- RAIIServerParameterControllerForTest controllerOptimization(
- "featureFlagChangeStreamsOptimization", true);
- ASSERT(getCSOptimizationFeatureFlagValue());
- RAIIServerParameterControllerForTest controllerRewrite("featureFlagChangeStreamsRewrite",
- true);
- ASSERT(getCSOptimizationFeatureFlagValue());
- ChangeStreamStageTest::run();
- }
+ /**
+ * Helper to create change stream pipeline for testing.
+ */
std::unique_ptr<Pipeline, PipelineDeleter> buildTestPipeline(
const std::vector<BSONObj>& rawPipeline) {
auto expCtx = getExpCtx();
@@ -504,6 +451,9 @@ public:
return pipeline;
}
+ /**
+ * Helper to verify if the change stream pipeline contains expected stages.
+ */
void assertStagesNameOrder(std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
const std::vector<std::string> expectedStages) {
ASSERT_EQ(pipeline->getSources().size(), expectedStages.size());
@@ -519,6 +469,14 @@ public:
}
};
+bool getCSRewriteFeatureFlagValue() {
+ return feature_flags::gFeatureFlagChangeStreamsRewrite.isEnabledAndIgnoreFCV();
+}
+
+bool isChangeStreamPreAndPostImagesEnabled() {
+ return feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabledAndIgnoreFCV();
+}
+
TEST_F(ChangeStreamStageTest, ShouldRejectNonObjectArg) {
auto expCtx = getExpCtx();
@@ -2186,12 +2144,8 @@ TEST_F(ChangeStreamStageTest, MatchFiltersNoOp) {
checkTransformation(noOp, boost::none);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
- TransformationShouldBeAbleToReParseSerializedStage) {
+TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage) {
auto expCtx = getExpCtx();
- const auto featureFlag = getCSOptimizationFeatureFlagValue();
- const auto serializedStageName =
- featureFlag ? DocumentSourceChangeStreamTransform::kStageName : DSChangeStream::kStageName;
DocumentSourceChangeStreamSpec spec;
spec.setStartAtOperationTime(kDefaultTs);
@@ -2201,8 +2155,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
- const size_t changeStreamStageSize = featureFlag ? 5 : 6;
- ASSERT_EQ(allStages.size(), changeStreamStageSize);
+ ASSERT_EQ(allStages.size(), 5);
auto stage = allStages[2];
ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get()));
@@ -2215,8 +2168,9 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
ASSERT_EQ(serialization.size(), 1UL);
ASSERT_EQ(serialization[0].getType(), BSONType::Object);
auto serializedDoc = serialization[0].getDocument();
- ASSERT_BSONOBJ_EQ(serializedDoc[serializedStageName].getDocument().toBson(),
- originalSpec[""].Obj());
+ ASSERT_BSONOBJ_EQ(
+ serializedDoc[DocumentSourceChangeStreamTransform::kStageName].getDocument().toBson(),
+ originalSpec[""].Obj());
//
// Create a new stage from the serialization. Serialize the new stage and confirm that it is
@@ -2227,26 +2181,15 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx);
auto newSerialization = roundTripped->serialize();
- // When optimiziation is enabled, we should serialize all the internal stages.
- if (featureFlag) {
- ASSERT_EQ(newSerialization.size(), 5UL);
+ ASSERT_EQ(newSerialization.size(), 5UL);
- // DSCSTransform stage should be the third stage after DSCSOplogMatch and
- // DSCSUnwindTransactions stages.
- ASSERT_VALUE_EQ(newSerialization[2], serialization[0]);
- } else {
- ASSERT_EQ(newSerialization.size(), 1UL);
- ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
- }
+ // DSCSTransform stage should be the third stage after DSCSOplogMatch and
+ // DSCSUnwindTransactions stages.
+ ASSERT_VALUE_EQ(newSerialization[2], serialization[0]);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
- DSCSTransformStageEmptySpecSerializeResumeAfter) {
+TEST_F(ChangeStreamStageTest, DSCSTransformStageEmptySpecSerializeResumeAfter) {
auto expCtx = getExpCtx();
- auto featureFlag = getCSOptimizationFeatureFlagValue();
- const auto serializedStageName =
- featureFlag ? DocumentSourceChangeStreamTransform::kStageName : DSChangeStream::kStageName;
-
auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj());
// Verify that the 'initialPostBatchResumeToken' is populated while parsing.
@@ -2271,18 +2214,13 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
ASSERT_EQ(serialization.size(), 1UL);
ASSERT_EQ(serialization[0].getType(), BSONType::Object);
ASSERT(!serialization[0]
- .getDocument()[serializedStageName]
- .getDocument()[featureFlag
- ? DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
- : DocumentSourceChangeStreamSpec::kResumeAfterFieldName]
+ .getDocument()[DocumentSourceChangeStreamTransform::kStageName]
+ .getDocument()[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName]
.missing());
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSTransformStageWithResumeTokenSerialize) {
+TEST_F(ChangeStreamStageTest, DSCSTransformStageWithResumeTokenSerialize) {
auto expCtx = getExpCtx();
- const auto serializedStageName = getCSOptimizationFeatureFlagValue()
- ? DocumentSourceChangeStreamTransform::kStageName
- : DSChangeStream::kStageName;
DocumentSourceChangeStreamSpec spec;
spec.setResumeAfter(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid())));
@@ -2303,7 +2241,10 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSTransformStageWithResu
stage->serializeToArray(serialization);
ASSERT_EQ(serialization.size(), 1UL);
ASSERT_EQ(serialization[0].getType(), BSONType::Object);
- ASSERT_BSONOBJ_EQ(serialization[0].getDocument()[serializedStageName].getDocument().toBson(),
+ ASSERT_BSONOBJ_EQ(serialization[0]
+ .getDocument()[DocumentSourceChangeStreamTransform::kStageName]
+ .getDocument()
+ .toBson(),
originalSpec[""].Obj());
}
@@ -2313,17 +2254,14 @@ void validateDocumentSourceStageSerialization(
auto stage = Stage::createFromBson(specAsBSON.firstElement(), expCtx);
vector<Value> serialization;
stage->serializeToArray(serialization);
- if (getCSOptimizationFeatureFlagValue()) {
- ASSERT_EQ(serialization.size(), 1UL);
- ASSERT_EQ(serialization[0].getType(), BSONType::Object);
- ASSERT_BSONOBJ_EQ(serialization[0].getDocument().toBson(),
- BSON(Stage::kStageName << spec.toBSON()));
- } else {
- ASSERT(serialization.empty());
- }
+
+ ASSERT_EQ(serialization.size(), 1UL);
+ ASSERT_EQ(serialization[0].getType(), BSONType::Object);
+ ASSERT_BSONOBJ_EQ(serialization[0].getDocument().toBson(),
+ BSON(Stage::kStageName << spec.toBSON()));
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSOplogMatchStageSerialization) {
+TEST_F(ChangeStreamStageTest, DSCSOplogMatchStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamOplogMatchSpec spec;
@@ -2335,7 +2273,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSOplogMatchStageSeriali
std::move(spec), stageSpecAsBSON, expCtx);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSUnwindTransactionStageSerialization) {
+TEST_F(ChangeStreamStageTest, DSCSUnwindTransactionStageSerialization) {
auto expCtx = getExpCtx();
auto filter = BSON("ns" << BSON("$regex"
@@ -2347,7 +2285,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSUnwindTransactionStage
std::move(spec), stageSpecAsBSON, expCtx);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSCheckInvalidateStageSerialization) {
+TEST_F(ChangeStreamStageTest, DSCSCheckInvalidateStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamCheckInvalidateSpec spec;
@@ -2359,7 +2297,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSCheckInvalidateStageSe
std::move(spec), stageSpecAsBSON, expCtx);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSResumabilityStageSerialization) {
+TEST_F(ChangeStreamStageTest, DSCSResumabilityStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamCheckResumabilitySpec spec;
@@ -2370,7 +2308,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSResumabilityStageSeria
std::move(spec), stageSpecAsBSON, expCtx);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePreImageStageSerialization) {
+TEST_F(ChangeStreamStageTest, DSCSLookupChangePreImageStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamAddPreImageSpec spec(FullDocumentBeforeChangeModeEnum::kRequired);
@@ -2380,7 +2318,7 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePreImageSt
std::move(spec), stageSpecAsBSON, expCtx);
}
-TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSLookupChangePostImageStageSerialization) {
+TEST_F(ChangeStreamStageTest, DSCSLookupChangePostImageStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamAddPostImageSpec spec(FullDocumentModeEnum::kUpdateLookup);
@@ -2417,11 +2355,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate);
// Then throw an exception on the next call of getNext().
- if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
- ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
- } else {
- ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
- }
+ ASSERT_THROWS(lastStage->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
}
TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) {
@@ -2433,11 +2367,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
match->setSource(lastStage.get());
// Throw an exception on the call of getNext().
- if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
- ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
- } else {
- ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
- }
+ ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
}
TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
@@ -3458,7 +3388,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleMatch) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) {
//
// Tests that the single '$match' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3480,7 +3410,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleMatch) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatch) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatch) {
//
// Tests that multiple '$match' gets merged and promoted before the
// '$_internalUpdateOnAddShard'.
@@ -3502,7 +3432,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatch) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatchAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) {
//
// Tests that multiple '$match' gets merged and promoted before the
// '$_internalUpdateOnAddShard' if resume token if present.
@@ -3530,7 +3460,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleMatchAndRes
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleProject) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleProject) {
//
// Tests that the single'$project' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3550,7 +3480,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleProject) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProject) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProject) {
//
// Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3572,7 +3502,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProject) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProjectAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) {
//
// Tests that multiple '$project' gets promoted before the '$_internalUpdateOnAddShard' if
// resume token is present.
@@ -3599,7 +3529,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleProjectAndR
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithProjectMatchAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) {
//
// Tests that a '$project' followed by a '$match' gets optimized and they get promoted before
// the '$_internalUpdateOnAddShard'.
@@ -3627,7 +3557,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithProjectMatchAndResu
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleUnset) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleUnset) {
//
// Tests that the single'$unset' gets promoted before the '$_internalUpdateOnAddShard' as
// '$project'.
@@ -3648,7 +3578,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleUnset) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleUnset) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleUnset) {
//
// Tests that multiple '$unset' gets promoted before the '$_internalUpdateOnAddShard' as
// '$project'.
@@ -3671,7 +3601,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleUnset) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithUnsetAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) {
//
// Tests that the '$unset' gets promoted before the '$_internalUpdateOnAddShard' as '$project'
// even if resume token is present.
@@ -3697,7 +3627,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithUnsetAndResumeToken
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleAddFields) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleAddFields) {
//
// Tests that the single'$addFields' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3717,7 +3647,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleAddFields) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleAddFields) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleAddFields) {
//
// Tests that multiple '$addFields' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3739,7 +3669,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleAddFields)
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAddFieldsAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) {
//
// Tests that the '$addFields' gets promoted before the '$_internalUpdateOnAddShard' if
// resume token is present.
@@ -3764,7 +3694,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAddFieldsAndResumeT
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleSet) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleSet) {
//
// Tests that the single'$set' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3784,7 +3714,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleSet) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleSet) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleSet) {
//
// Tests that multiple '$set' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3806,7 +3736,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithMultipleSet) {
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSetAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) {
//
// Tests that the '$set' gets promoted before the '$_internalUpdateOnAddShard' if
// resume token is present.
@@ -3831,7 +3761,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSetAndResumeToken)
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceRoot) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceRoot) {
//
// Tests that the single'$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard'.
//
@@ -3851,7 +3781,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceRoot)
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceRootAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) {
//
// Tests that the '$replaceRoot' gets promoted before the '$_internalUpdateOnAddShard' if
// resume token is present.
@@ -3877,7 +3807,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceRootAndResum
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceWith) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleReplaceWith) {
//
// Tests that the single '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' as
// '$replaceRoot'.
@@ -3898,7 +3828,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithSingleReplaceWith)
"$_internalChangeStreamHandleTopologyChange"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceWithAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) {
//
// Tests that the '$replaceWith' gets promoted before the '$_internalUpdateOnAddShard' if
// resume token is present as '$replaceRoot'.
@@ -3924,7 +3854,7 @@ TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithReplaceWithAndResum
"$_internalChangeStreamEnsureResumeTokenPresent"});
}
-TEST_F(ChangeStreamPipelineOptimizationTest, ChangeStreamWithAllStagesAndResumeToken) {
+TEST_F(ChangeStreamStageTest, ChangeStreamWithAllStagesAndResumeToken) {
//
// Tests that when all allowed stages are included along with the resume token, the final
// pipeline gets optimized.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index e03c237c56e..0e43a47ef0d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -65,11 +65,10 @@ namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
} // namespace
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamTransform,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamTransform::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamTransform::createFromBson,
+ true);
intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -444,7 +443,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
return doc.freeze();
}
-Value DocumentSourceChangeStreamTransform::serializeLatest(
+Value DocumentSourceChangeStreamTransform::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(Document{{DocumentSourceChangeStream::kStageName,
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 4ee2de16d1c..51d3fdc793c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -33,8 +33,7 @@
namespace mongo {
-class DocumentSourceChangeStreamTransform : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamTransform : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd;
@@ -54,9 +53,7 @@ public:
DocumentSource::GetModPathsReturn getModifiedPaths() const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
@@ -97,9 +94,6 @@ private:
*/
ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey, Value txnOpIndex);
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const final;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const final;
-
DocumentSourceChangeStreamSpec _changeStreamSpec;
// Map of collection UUID to document key fields.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index 4ff928d5970..d7f646c8d10 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -39,11 +39,10 @@
namespace mongo {
-REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamUnwindTransaction,
- LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamUnwindTransaction::createFromBson,
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction,
+ LiteParsedDocumentSourceChangeStreamInternal::parse,
+ DocumentSourceChangeStreamUnwindTransaction::createFromBson,
+ true);
boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction>
DocumentSourceChangeStreamUnwindTransaction::create(
@@ -88,7 +87,7 @@ StageConstraints DocumentSourceChangeStreamUnwindTransaction::constraints(
ChangeStreamRequirement::kChangeStreamStage);
}
-Value DocumentSourceChangeStreamUnwindTransaction::serializeLatest(
+Value DocumentSourceChangeStreamUnwindTransaction::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
tassert(5467604, "expression has not been initialized", _expression);
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
index ef48340d331..a2659178d6b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
@@ -41,8 +41,7 @@ namespace mongo {
* output, but all other entries pass through unmodified. Note that the namespace filter applies
* only to unwound transaction operations, not to any other entries.
*/
-class DocumentSourceChangeStreamUnwindTransaction : public DocumentSource,
- public ChangeStreamStageSerializationInterface {
+class DocumentSourceChangeStreamUnwindTransaction : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalChangeStreamUnwindTransaction"_sd;
@@ -56,13 +55,7 @@ public:
DocumentSource::GetModPathsReturn getModifiedPaths() const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- return ChangeStreamStageSerializationInterface::serializeToValue(explain);
- }
-
- Value serializeLegacy(boost::optional<ExplainOptions::Verbosity> explain) const;
- Value serializeLatest(boost::optional<ExplainOptions::Verbosity> explain) const;
-
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 08dac82eaed..45f4386637d 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -75,8 +75,8 @@ using std::vector;
const NamespaceString kTestNss = NamespaceString("a.collection");
-size_t getChangeStreamStageSize() {
- return (feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() ? 6 : 6);
+constexpr size_t getChangeStreamStageSize() {
+ return 6;
}
void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) {