summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2021-05-12 14:11:44 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-31 18:56:08 +0000
commit3cf2bc8bfc833f9d2b044b2b3da611819409cc8d (patch)
treeb3c15657dec4570e7927a45b5d9c5066d019ecea /src
parent5aeade8f1ed4a895f935dc7c5dbec506f837d63d (diff)
downloadmongo-3cf2bc8bfc833f9d2b044b2b3da611819409cc8d.tar.gz
SERVER-56669 Refactor change streams stage building logic
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/SConscript4
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp147
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp141
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h18
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.h17
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp (renamed from src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp)18
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h (renamed from src/mongo/db/pipeline/document_source_change_stream_topology_change.h)13
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp (renamed from src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp)20
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h (renamed from src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h)8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.h4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp90
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h8
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp23
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp2
22 files changed, 351 insertions, 269 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 9c167fb3b3b..dea32771712 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -343,15 +343,15 @@ env.Library(
'document_source_change_stream.cpp',
'document_source_change_stream_check_invalidate.cpp',
'document_source_change_stream_check_resumability.cpp',
+ 'document_source_change_stream_check_topology_change.cpp',
'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_ensure_resume_token_present.cpp',
+ 'document_source_change_stream_handle_topology_change.cpp',
'document_source_change_stream_lookup_post_image.cpp',
'document_source_change_stream_lookup_pre_image.cpp',
'document_source_change_stream_oplog_match.cpp',
- 'document_source_change_stream_topology_change.cpp',
'document_source_change_stream_transform.cpp',
'document_source_change_stream_unwind_transactions.cpp',
- 'document_source_change_stream_update_on_add_shard.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/pipeline/pipeline',
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 65baaeb3e6b..0fff6070ba3 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -33,144 +33,79 @@
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
+#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
-#include "mongo/db/pipeline/document_source_change_stream_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h"
-#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
#include "mongo/db/pipeline/expression.h"
namespace mongo {
namespace change_stream_legacy {
std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceChangeStreamSpec spec,
- BSONElement rawSpec) {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) {
std::list<boost::intrusive_ptr<DocumentSource>> stages;
- boost::intrusive_ptr<DocumentSource> resumeStage = nullptr;
- boost::optional<ResumeTokenData> startAfterInvalidate;
- bool showMigrationEvents = spec.getShowMigrationEvents();
- uassert(31123,
- "Change streams from mongos may not show migration events.",
- !(expCtx->inMongos && showMigrationEvents));
-
- auto resumeAfter = spec.getResumeAfter();
- auto startAfter = spec.getStartAfter();
- if (resumeAfter || startAfter) {
- uassert(50865,
- "Do not specify both 'resumeAfter' and 'startAfter' in a $changeStream stage",
- !startAfter || !resumeAfter);
-
- const ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get();
- const ResumeTokenData tokenData = token.getData();
-
- // If resuming from an "invalidate" using "startAfter", pass along the resume token data to
- // DocumentSourceChangeStreamCheckInvalidate to signify that another invalidate should not
- // be generated.
- if (startAfter && tokenData.fromInvalidate) {
- startAfterInvalidate = tokenData;
- }
- uassert(ErrorCodes::InvalidResumeToken,
- "Attempting to resume a change stream using 'resumeAfter' is not allowed from an "
- "invalidate notification.",
- !resumeAfter || !tokenData.fromInvalidate);
-
- // If we are resuming a single-collection stream, the resume token should always contain a
- // UUID unless the token is a high water mark.
- uassert(ErrorCodes::InvalidResumeToken,
- "Attempted to resume a single-collection stream, but the resume token does not "
- "include a UUID.",
- tokenData.uuid || !expCtx->isSingleNamespaceAggregation() ||
- ResumeToken::isHighWaterMarkToken(tokenData));
-
- // For a regular resume token, we must ensure that (1) all shards are capable of resuming
- // from the given clusterTime, and (2) that we observe the resume token event in the stream
- // before any event that would sort after it. High water mark tokens, however, do not refer
- // to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'.
- if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) {
- resumeStage = DocumentSourceChangeStreamCheckResumability::create(expCtx, tokenData);
- } else {
- resumeStage =
- DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, tokenData);
- }
- }
+ const auto userRequestedResumePoint =
+ spec.getResumeAfter() || spec.getStartAfter() || spec.getStartAtOperationTime();
- // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'.
- if (auto startAtOperationTime = spec.getStartAtOperationTime()) {
- uassert(40674,
- "Only one type of resume option is allowed, but multiple were found.",
- !resumeStage);
- resumeStage =
- DocumentSourceChangeStreamCheckResumability::create(expCtx, *startAtOperationTime);
+ if (!userRequestedResumePoint) {
+ // Make sure we update the 'resumeAfter' in the 'spec' so that we serialize the
+ // correct resume token when sending it to the shards.
+ spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken(
+ DocumentSourceChangeStream::getStartTimeForNewStream(expCtx)));
}
- auto transformStage = DocumentSourceChangeStreamTransform::createFromBson(rawSpec, expCtx);
+ // Unfold the $changeStream into its constituent stages and add them to the pipeline.
+ stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec));
+ stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, spec));
tassert(5467606,
"'DocumentSourceChangeStreamTransform' stage should populate "
"'initialPostBatchResumeToken' field",
!expCtx->initialPostBatchResumeToken.isEmpty());
- // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies
- // upon the fact that it is always the first stage in the pipeline.
- stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, showMigrationEvents));
-
- stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
- stages.push_back(transformStage);
-
- const bool csOptFeatureFlag =
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV();
-
- // The 'DocumentSourceChangeStreamTopologyChange' only runs in a cluster, and will be dispatched
- // by mongoS to the shards.
- if (csOptFeatureFlag && expCtx->inMongos) {
- stages.push_back(DocumentSourceChangeStreamTopologyChange::create(expCtx));
- }
-
// The resume stage must come after the check invalidate stage so that the former can determine
// whether the event that matches the resume token should be followed by an "invalidate" event.
- stages.push_back(
- DocumentSourceChangeStreamCheckInvalidate::create(expCtx, startAfterInvalidate));
-
- // The resume stage 'DocumentSourceChangeStreamCheckResumability' should come before the split
- // point stage 'DocumentSourceChangeStreamUpdateOnAddShard'.
- if (resumeStage &&
- resumeStage->getSourceName() == DocumentSourceChangeStreamCheckResumability::kStageName) {
- stages.push_back(resumeStage);
- resumeStage.reset();
+ stages.push_back(DocumentSourceChangeStreamCheckInvalidate::create(expCtx, spec));
+
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+
+ // If the user-requested resume point is a high water mark, or if we are running on the shards
+ // in a cluster, we must include a DSCSCheckResumability stage.
+ if (expCtx->needsMerge ||
+ (userRequestedResumePoint && ResumeToken::isHighWaterMarkToken(resumeToken))) {
+ stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec));
}
- // If the pipeline is build on MongoS, then the stage
- // 'DocumentSourceChangeStreamUpdateOnAddShard' acts as the split point for the pipline. All
- // stages before this stages will run on shards and all stages after and inclusive of this stage
- // will run on the MongoS.
+ // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as
+ // the split point for the pipline. All stages before this stages will run on shards and all
+ // stages after and inclusive of this stage will run on the MongoS.
if (expCtx->inMongos) {
- stages.push_back(DocumentSourceChangeStreamUpdateOnAddShard::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx));
}
- // This resume stage should be 'DocumentSourceChangeStreamEnsureResumeTokenPresent'.
- if (resumeStage) {
- stages.push_back(resumeStage);
+ // If the resume token is from an event, we must include a DSCSEnsureResumeTokenPresent stage.
+ // In a cluster, this will be on mongoS and should not be generated on the shards.
+ if (!expCtx->needsMerge && !ResumeToken::isHighWaterMarkToken(resumeToken)) {
+ stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec));
}
if (!expCtx->needsMerge) {
- if (!csOptFeatureFlag) {
- // There should only be one close cursor stage. If we're on the shards and producing
- // input to be merged, do not add a close cursor stage, since the mongos will already
- // have one.
- stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx));
- }
-
- // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
- // so that any $match stages which follow the $changeStream pipeline prefix may be able to
- // skip ahead of the DSLPreImage stage. This allows a whole-db or whole-cluster stream to
- // run on an instance where only some collections have pre-images enabled, so long as the
- // user filters for only those namespaces.
+ // There should only be one close cursor stage. If we're on the shards and producing input
+ // to be merged, do not add a close cursor stage, since the mongos will already have one.
+ stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx));
+
+ // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage
+ // here so that any $match stages which follow the $changeStream pipeline prefix may be
+ // able to skip ahead of the DSCSLookupPreImage stage. This allows a whole-db or
+ // whole-cluster stream to run on an instance where only some collections have pre-images
+ // enabled, so long as the user filters for only those namespaces.
// TODO SERVER-36941: figure out how to get this to work in a sharded cluster.
if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) {
invariant(!expCtx->inMongos);
@@ -240,7 +175,7 @@ Value DocumentSourceChangeStreamLookupPostImage::serializeLegacy(
return (explain ? Value{Document{{kStageName, Document()}}} : Value());
}
-Value DocumentSourceChangeStreamTopologyChange::serializeLegacy(
+Value DocumentSourceChangeStreamCheckTopologyChange::serializeLegacy(
boost::optional<ExplainOptions::Verbosity> explain) const {
return (explain ? Value{Document{{kStageName, Document()}}} : Value());
}
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
index e6c29e0ec90..4c790782a20 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
@@ -38,7 +38,6 @@ namespace mongo::change_stream_legacy {
*/
std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceChangeStreamSpec spec,
- BSONElement rawSpec);
+ const DocumentSourceChangeStreamSpec spec);
} // namespace mongo::change_stream_legacy
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 94c6f58adf0..c4de7837030 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -40,12 +40,15 @@
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h"
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
+#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h"
+#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h"
#include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h"
+#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h"
-#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
@@ -151,10 +154,41 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
}
}
+ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec(
+ const DocumentSourceChangeStreamSpec& spec) {
+ if (spec.getStartAfter()) {
+ return spec.getStartAfter()->getData();
+ } else if (spec.getResumeAfter()) {
+ return spec.getResumeAfter()->getData();
+ } else if (spec.getStartAtOperationTime()) {
+ return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime()).getData();
+ }
+ tasserted(5666901,
+ "Expected one of 'startAfter', 'resumeAfter' or 'startAtOperationTime' to be "
+ "populated in $changeStream spec");
+}
+
+Timestamp DocumentSourceChangeStream::getStartTimeForNewStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ // If we do not have an explicit starting point, we should start from the latest majority
+ // committed operation. If we are on mongoS and do not have a starting point, set it to the
+ // current clusterTime so that all shards start in sync.
+ auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
+ const auto currentTime =
+ !expCtx->inMongos ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} : [&] {
+ const auto currentTime = VectorClock::get(expCtx->opCtx)->getTime();
+ return currentTime.clusterTime();
+ }();
+
+ // We always start one tick beyond the most recent operation, to ensure that the stream does not
+ // return it.
+ return currentTime.addTicks(1).asTimestamp();
+}
+
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
uassert(50808,
- "$changeStream stage expects a document as argument.",
+ "$changeStream stage expects a document as argument",
elem.type() == BSONType::Object);
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
@@ -163,7 +197,79 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// Make sure that it is legal to run this $changeStream before proceeding.
DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec);
- return change_stream_legacy::buildPipeline(expCtx, spec, elem);
+ // If we see this stage on a shard, it means that the raw $changeStream stage was dispatched to
+ // us from an old mongoS. Build a legacy shard pipeline.
+ if (expCtx->needsMerge ||
+ !feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
+ return change_stream_legacy::buildPipeline(expCtx, spec);
+ }
+ return _buildPipeline(expCtx, spec);
+}
+
+std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_buildPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) {
+ std::list<boost::intrusive_ptr<DocumentSource>> stages;
+
+ // If the user did not specify an explicit starting point, set it to the current time.
+ if (!spec.getResumeAfter() && !spec.getStartAfter() && !spec.getStartAtOperationTime()) {
+ // Make sure we update the 'startAtOperationTime' in the 'spec' so that we serialize the
+ // correct start point when sending it to the shards.
+ spec.setStartAtOperationTime(DocumentSourceChangeStream::getStartTimeForNewStream(expCtx));
+ }
+
+ // Obtain the resume token from the spec. This will be used when building the pipeline.
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+
+ // Unfold the $changeStream into its constituent stages and add them to the pipeline.
+ stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec));
+ stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, spec));
+ tassert(5666900,
+ "'DocumentSourceChangeStreamTransform' stage should populate "
+ "'initialPostBatchResumeToken' field",
+ !expCtx->initialPostBatchResumeToken.isEmpty());
+
+ // The resume stage must come after the check invalidate stage so that the former can determine
+ // whether the event that matches the resume token should be followed by an "invalidate" event.
+ stages.push_back(DocumentSourceChangeStreamCheckInvalidate::create(expCtx, spec));
+
+ // If the starting point is a high water mark, or if we will be splitting the pipeline for
+ // dispatch to the shards in a cluster, we must include a DSCSCheckResumability stage.
+ if (expCtx->inMongos || ResumeToken::isHighWaterMarkToken(resumeToken)) {
+ stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec));
+ }
+
+ // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as the
+ // split point for the pipline. All stages before this stages will run on shards and all stages
+ // after and inclusive of this stage will run on the MongoS.
+ if (expCtx->inMongos) {
+ stages.push_back(DocumentSourceChangeStreamCheckTopologyChange::create(expCtx));
+ stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx));
+ }
+
+ // If the resume point is an event, we must include a DSCSEnsureResumeTokenPresent stage.
+ if (!ResumeToken::isHighWaterMarkToken(resumeToken)) {
+ stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec));
+ }
+
+ // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
+ // so that any $match stages which follow the $changeStream pipeline prefix may be able to
+ // skip ahead of the DSCSLookupPreImage stage. This allows a whole-db or whole-cluster stream to
+ // run on an instance where only some collections have pre-images enabled, so long as the
+ // user filters for only those namespaces.
+ // TODO SERVER-36941: figure out how to get this to work in a sharded cluster.
+ if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) {
+ invariant(!expCtx->inMongos);
+ stages.push_back(DocumentSourceChangeStreamLookupPreImage::create(expCtx, spec));
+ }
+
+ // There should be only one post-image lookup stage. If we're on the shards and producing
+ // input to be merged, the lookup is done on the mongos.
+ if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) {
+ stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx));
+ }
+
+ return stages;
}
void DocumentSourceChangeStream::assertIsLegalSpecification(
@@ -212,6 +318,35 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
uassert(51771,
"the 'fullDocumentBeforeChange' option is not supported in a sharded cluster",
!(shouldLookupPreImage && (expCtx->inMongos || expCtx->needsMerge)));
+
+ uassert(31123,
+ "Change streams from mongos may not show migration events",
+ !(expCtx->inMongos && spec.getShowMigrationEvents()));
+
+ uassert(50865,
+ "Do not specify both 'resumeAfter' and 'startAfter' in a $changeStream stage",
+ !spec.getResumeAfter() || !spec.getStartAfter());
+
+ auto resumeToken = (spec.getResumeAfter() || spec.getStartAfter())
+ ? resolveResumeTokenFromSpec(spec)
+ : boost::optional<ResumeTokenData>();
+
+ uassert(40674,
+ "Only one type of resume option is allowed, but multiple were found",
+ !(spec.getStartAtOperationTime() && resumeToken));
+
+ uassert(ErrorCodes::InvalidResumeToken,
+ "Attempting to resume a change stream using 'resumeAfter' is not allowed from an "
+ "invalidate notification",
+ !(spec.getResumeAfter() && resumeToken->fromInvalidate));
+
+ // If we are resuming a single-collection stream, the resume token should always contain a
+ // UUID unless the token is a high water mark.
+ uassert(ErrorCodes::InvalidResumeToken,
+ "Attempted to resume a single-collection stream, but the resume token does not "
+ "include a UUID",
+ !resumeToken || resumeToken->uuid || !expCtx->isSingleNamespaceAggregation() ||
+ ResumeToken::isHighWaterMarkToken(*resumeToken));
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 144dddf84d4..29b67fe5262 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -187,7 +187,25 @@ public:
*/
static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType);
+ /**
+ * Extracts the resume token from the given spec. If a 'startAtOperationTime' is specified,
+ * returns the equivalent high-watermark token. This method should only ever be called on a spec
+ * where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated.
+ */
+ static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec);
+
+ /**
+ * For a change stream with no resume information supplied by the user, returns the clusterTime
+ * at which the new stream should begin scanning the oplog.
+ */
+ static Timestamp getStartTimeForNewStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
private:
+ // Constructs and returns a series of stages representing the full change stream pipeline.
+ static std::list<boost::intrusive_ptr<DocumentSource>> _buildPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec);
+
// Helper function which throws if the $changeStream fails any of a series of semantic checks.
// For instance, whether it is permitted to run given the current FCV, whether the namespace is
// valid for the options specified in the spec, etc.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
index a5ee091e326..10b60f55307 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
@@ -66,6 +66,17 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt
} // namespace
boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate>
+DocumentSourceChangeStreamCheckInvalidate::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec) {
+ // If resuming from an "invalidate" using "startAfter", pass along the resume token data to
+ // DSCSCheckInvalidate to signify that another invalidate should not be generated.
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ return new DocumentSourceChangeStreamCheckInvalidate(
+ expCtx, boost::make_optional(resumeToken.fromInvalidate, std::move(resumeToken)));
+}
+
+boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate>
DocumentSourceChangeStreamCheckInvalidate::createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467602,
@@ -77,9 +88,8 @@ DocumentSourceChangeStreamCheckInvalidate::createFromBson(
spec.embeddedObject());
return new DocumentSourceChangeStreamCheckInvalidate(
expCtx,
- parsed.getStartAfterInvalidate()
- ? boost::optional<ResumeTokenData>(parsed.getStartAfterInvalidate()->getData())
- : boost::none);
+ parsed.getStartAfterInvalidate() ? parsed.getStartAfterInvalidate()->getData()
+ : boost::optional<ResumeTokenData>());
}
DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNext() {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
index 92647a6b48d..30dc1b6eeda 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h
@@ -72,12 +72,10 @@ public:
static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- boost::optional<ResumeTokenData> startAfterInvalidate) {
- return new DocumentSourceChangeStreamCheckInvalidate(expCtx,
- std::move(startAfterInvalidate));
- }
+ const DocumentSourceChangeStreamSpec& spec);
private:
/**
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index ffb7ff6cd72..c22ea8d5c7d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -179,15 +179,23 @@ DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResu
intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
- Timestamp ts) {
- // We are resuming from a point in time, not an event. Seed the stage with a high water mark.
- return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData());
+ const DocumentSourceChangeStreamSpec& spec) {
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(resumeToken));
}
intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
-DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
- ResumeTokenData token) {
- return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(token));
+DocumentSourceChangeStreamCheckResumability::createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(5467603,
+ str::stream() << "the '" << kStageName << "' object spec must be an object",
+ spec.type() == Object);
+
+ auto parsed = DocumentSourceChangeStreamCheckResumabilitySpec::parse(
+ IDLParserErrorContext("DocumentSourceChangeStreamCheckResumabilitySpec"),
+ spec.embeddedObject());
+ return new DocumentSourceChangeStreamCheckResumability(expCtx,
+ parsed.getResumeToken().getData());
}
const char* DocumentSourceChangeStreamCheckResumability::getSourceName() const {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
index 9cbed08490a..21fad097655 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h
@@ -95,22 +95,11 @@ public:
}
static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createFromBson(
- BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- uassert(5467603,
- str::stream() << "the '" << kStageName << "' object spec must be an object",
- spec.type() == Object);
-
- auto parsed = DocumentSourceChangeStreamCheckResumabilitySpec::parse(
- IDLParserErrorContext("DocumentSourceChangeStreamCheckResumabilitySpec"),
- spec.embeddedObject());
- return create(expCtx, parsed.getResumeToken().getData());
- }
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx);
static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts);
-
- static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
protected:
/**
diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
index ce46192ac4a..724f6c1338f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
@@ -29,19 +29,19 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/document_source_change_stream_topology_change.h"
+#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
#include "mongo/db/pipeline/change_stream_topology_change_info.h"
namespace mongo {
REGISTER_INTERNAL_DOCUMENT_SOURCE(
- _internalChangeStreamTopologyChange,
+ _internalChangeStreamCheckTopologyChange,
LiteParsedDocumentSourceChangeStreamInternal::parse,
- DocumentSourceChangeStreamTopologyChange::createFromBson,
+ DocumentSourceChangeStreamCheckTopologyChange::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
-StageConstraints DocumentSourceChangeStreamTopologyChange::constraints(
+StageConstraints DocumentSourceChangeStreamCheckTopologyChange::constraints(
Pipeline::SplitState pipeState) const {
return {StreamType::kStreaming,
PositionRequirement::kNone,
@@ -55,16 +55,16 @@ StageConstraints DocumentSourceChangeStreamTopologyChange::constraints(
}
-boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange>
-DocumentSourceChangeStreamTopologyChange::createFromBson(
+boost::intrusive_ptr<DocumentSourceChangeStreamCheckTopologyChange>
+DocumentSourceChangeStreamCheckTopologyChange::createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5669601,
str::stream() << "the '" << kStageName << "' spec must be an object",
elem.type() == Object && elem.Obj().isEmpty());
- return new DocumentSourceChangeStreamTopologyChange(expCtx);
+ return new DocumentSourceChangeStreamCheckTopologyChange(expCtx);
}
-DocumentSource::GetNextResult DocumentSourceChangeStreamTopologyChange::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doGetNext() {
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced()) {
@@ -87,7 +87,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTopologyChange::doGetNex
return nextInput;
}
-Value DocumentSourceChangeStreamTopologyChange::serializeLatest(
+Value DocumentSourceChangeStreamCheckTopologyChange::serializeLatest(
boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) {
return Value(DOC(DocumentSourceChangeStream::kStageName
diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h
index 8e03f75ca15..850130359e7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_topology_change.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h
@@ -45,18 +45,18 @@ namespace mongo {
* that previously may not have held any data for the collection being watched, and they contain the
* information necessary for the mongoS to include the new shard in the merged change stream.
*/
-class DocumentSourceChangeStreamTopologyChange final
+class DocumentSourceChangeStreamCheckTopologyChange final
: public DocumentSource,
public ChangeStreamStageSerializationInterface {
public:
- static constexpr StringData kStageName = "$_internalChangeStreamTopologyChange"_sd;
+ static constexpr StringData kStageName = "$_internalChangeStreamCheckTopologyChange"_sd;
- static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> createFromBson(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckTopologyChange> createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
- static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamCheckTopologyChange> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceChangeStreamTopologyChange(expCtx);
+ return new DocumentSourceChangeStreamCheckTopologyChange(expCtx);
}
const char* getSourceName() const final {
@@ -74,7 +74,8 @@ public:
}
private:
- DocumentSourceChangeStreamTopologyChange(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ DocumentSourceChangeStreamCheckTopologyChange(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
GetNextResult doGetNext() final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
index cb6dfc0ede1..7ea37e67da8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
@@ -40,8 +40,13 @@ DocumentSourceChangeStreamEnsureResumeTokenPresent::
boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
DocumentSourceChangeStreamEnsureResumeTokenPresent::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
- return new DocumentSourceChangeStreamEnsureResumeTokenPresent(expCtx, std::move(token));
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec) {
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ tassert(5666902,
+ "Expected non-high-water-mark resume token",
+ !ResumeToken::isHighWaterMarkToken(resumeToken));
+ return new DocumentSourceChangeStreamEnsureResumeTokenPresent(expCtx, std::move(resumeToken));
}
const char* DocumentSourceChangeStreamEnsureResumeTokenPresent::getSourceName() const {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
index b380f72d643..57b91107913 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h
@@ -58,7 +58,8 @@ public:
}
static boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
index 48a3e7db187..962e48fe4b8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
@@ -27,7 +27,7 @@
* it in the license file.
*/
-#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
+#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include <algorithm>
@@ -77,17 +77,17 @@ bool isShardConfigEvent(const Document& eventDoc) {
}
} // namespace
-boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard>
-DocumentSourceChangeStreamUpdateOnAddShard::create(
+boost::intrusive_ptr<DocumentSourceChangeStreamHandleTopologyChange>
+DocumentSourceChangeStreamHandleTopologyChange::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceChangeStreamUpdateOnAddShard(expCtx);
+ return new DocumentSourceChangeStreamHandleTopologyChange(expCtx);
}
-DocumentSourceChangeStreamUpdateOnAddShard::DocumentSourceChangeStreamUpdateOnAddShard(
+DocumentSourceChangeStreamHandleTopologyChange::DocumentSourceChangeStreamHandleTopologyChange(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
-DocumentSource::GetNextResult DocumentSourceChangeStreamUpdateOnAddShard::doGetNext() {
+DocumentSource::GetNextResult DocumentSourceChangeStreamHandleTopologyChange::doGetNext() {
// For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be
// populated. We also resolve the original aggregation command from the expression context.
if (!_mergeCursors) {
@@ -113,13 +113,13 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamUpdateOnAddShard::doGetN
return childResult;
}
-void DocumentSourceChangeStreamUpdateOnAddShard::addNewShardCursors(
+void DocumentSourceChangeStreamHandleTopologyChange::addNewShardCursors(
const Document& newShardDetectedObj) {
_mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
}
std::vector<RemoteCursor>
-DocumentSourceChangeStreamUpdateOnAddShard::establishShardCursorsOnNewShards(
+DocumentSourceChangeStreamHandleTopologyChange::establishShardCursorsOnNewShards(
const Document& newShardDetectedObj) {
// Reload the shard registry to see the new shard.
auto* opCtx = pExpCtx->opCtx;
@@ -146,7 +146,7 @@ DocumentSourceChangeStreamUpdateOnAddShard::establishShardCursorsOnNewShards(
allowPartialResults);
}
-BSONObj DocumentSourceChangeStreamUpdateOnAddShard::createUpdatedCommandForNewShard(
+BSONObj DocumentSourceChangeStreamHandleTopologyChange::createUpdatedCommandForNewShard(
Timestamp shardAddedTime) {
// We must start the new cursor from the moment at which the shard became visible.
const auto newShardAddedTime = LogicalTime{shardAddedTime};
@@ -178,7 +178,7 @@ BSONObj DocumentSourceChangeStreamUpdateOnAddShard::createUpdatedCommandForNewSh
true /* needsMerge */);
}
-BSONObj DocumentSourceChangeStreamUpdateOnAddShard::replaceResumeTokenInCommand(
+BSONObj DocumentSourceChangeStreamHandleTopologyChange::replaceResumeTokenInCommand(
Document resumeToken) {
Document originalCmd(_originalAggregateCommand);
auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray();
diff --git a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
index 4270ff8b1ec..1d240be5e3c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h
@@ -45,15 +45,15 @@ namespace mongo {
* the first time. When this event is detected, this stage will establish a new cursor on that
* shard and add it to the cursors being merged.
*/
-class DocumentSourceChangeStreamUpdateOnAddShard final : public DocumentSource {
+class DocumentSourceChangeStreamHandleTopologyChange final : public DocumentSource {
public:
- static constexpr StringData kStageName = "$_internalUpdateOnAddShard"_sd;
+ static constexpr StringData kStageName = "$_internalChangeStreamHandleTopologyChange"_sd;
/**
* Creates a new stage which will establish a new cursor and add it to the cursors being merged
* by 'mergeCursorsStage' whenever a new shard is detected by a change stream.
*/
- static boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard> create(
+ static boost::intrusive_ptr<DocumentSourceChangeStreamHandleTopologyChange> create(
const boost::intrusive_ptr<ExpressionContext>&);
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
@@ -77,7 +77,7 @@ public:
}
private:
- DocumentSourceChangeStreamUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&);
+ DocumentSourceChangeStreamHandleTopologyChange(const boost::intrusive_ptr<ExpressionContext>&);
GetNextResult doGetNext() final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
index 6f5e55cd47d..5dd28772c6c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
@@ -189,20 +189,14 @@ BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive)
<< BSON(OR(opMatch, commandAndApplyOpsMatch))));
}
-
} // namespace
boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch>
DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- bool showMigrationEvents) {
- // TODO SERVER-56669: ensure that 'initialPostBatchResumeToken' is always populated at this
- // point.
+ const DocumentSourceChangeStreamSpec& spec) {
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
return make_intrusive<DocumentSourceChangeStreamOplogMatch>(
- buildMatchFilter(
- expCtx,
- ResumeToken::parse(expCtx->initialPostBatchResumeToken).getData().clusterTime,
- showMigrationEvents),
- expCtx);
+ buildMatchFilter(expCtx, resumeToken.clusterTime, spec.getShowMigrationEvents()), expCtx);
}
boost::intrusive_ptr<DocumentSource> DocumentSourceChangeStreamOplogMatch::createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
index 06d6602ca49..68755f811b9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h
@@ -57,8 +57,10 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
static boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, bool showMigrationEvents);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
const char* getSourceName() const final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index df6fbb1e5d6..a0902b70e3c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -2094,9 +2094,9 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
DSCSTransformStageEmptySpecSerializeResumeAfter) {
auto expCtx = getExpCtx();
- const auto serializedStageName = getCSOptimizationFeatureFlagValue()
- ? DocumentSourceChangeStreamTransform::kStageName
- : DSChangeStream::kStageName;
+ auto featureFlag = getCSOptimizationFeatureFlagValue();
+ const auto serializedStageName =
+ featureFlag ? DocumentSourceChangeStreamTransform::kStageName : DSChangeStream::kStageName;
auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj());
@@ -2107,18 +2107,25 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest,
expCtx->initialPostBatchResumeToken = BSONObj();
});
- auto stage =
- DocumentSourceChangeStreamTransform::createFromBson(originalSpec.firstElement(), expCtx);
+ auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx);
ASSERT(!expCtx->initialPostBatchResumeToken.isEmpty());
- // Verify that an additional 'startAtOperationTime' is populated while serializing.
+ vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
+ ASSERT_EQ(allStages.size(), 5);
+ auto transformStage = allStages[2];
+ ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(transformStage.get()));
+
+
+ // Verify that an additional start point field is populated while serializing.
vector<Value> serialization;
- stage->serializeToArray(serialization);
+ transformStage->serializeToArray(serialization);
ASSERT_EQ(serialization.size(), 1UL);
ASSERT_EQ(serialization[0].getType(), BSONType::Object);
ASSERT(!serialization[0]
.getDocument()[serializedStageName]
- .getDocument()[DocumentSourceChangeStreamSpec::kResumeAfterFieldName]
+ .getDocument()[featureFlag
+ ? DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
+ : DocumentSourceChangeStreamSpec::kResumeAfterFieldName]
.missing());
}
@@ -2448,14 +2455,13 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
BSON("x" << 2 << "_id" << 1),
ResumeTokenData::FromInvalidate::kFromInvalidate);
- ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName
- << BSON("resumeAfter" << resumeTokenInvalidate
- << "startAtOperationTime" << kDefaultTs))
- .firstElement(),
- expCtx),
- AssertionException,
- ErrorCodes::InvalidResumeToken);
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeTokenInvalidate))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::InvalidResumeToken);
}
TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 0f62b87ec4f..229b9229565 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -47,11 +47,9 @@
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
-#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/update/update_oplog_entry_version.h"
-#include "mongo/db/vector_clock.h"
namespace mongo {
@@ -71,81 +69,57 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(
DocumentSourceChangeStreamTransform::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec) {
+ return new DocumentSourceChangeStreamTransform(expCtx, spec);
+}
+
intrusive_ptr<DocumentSourceChangeStreamTransform>
DocumentSourceChangeStreamTransform::createFromBson(
- BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ BSONElement rawSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467601,
"the '$_internalChangeStreamTransform' object spec must be an object",
- spec.type() == BSONType::Object);
-
- return new DocumentSourceChangeStreamTransform(expCtx, spec.Obj());
+ rawSpec.type() == BSONType::Object);
+ auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
+ rawSpec.Obj());
+ return new DocumentSourceChangeStreamTransform(expCtx, std::move(spec));
}
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec)
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec)
: DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx),
- _changeStreamSpec(DocumentSourceChangeStreamSpec::parse(
- IDLParserErrorContext("$changeStream"), changeStreamSpec)),
+ _changeStreamSpec(std::move(spec)),
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
// If the change stream spec requested a pre-image, make sure that we supply one.
_includePreImageOptime =
(_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
+ // Extract the resume token or high-water-mark from the spec.
+ auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
+
+ // Set the initialPostBatchResumeToken on the expression context.
+ expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON();
+
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
- auto resumeAfter = _changeStreamSpec.getResumeAfter();
- auto startAfter = _changeStreamSpec.getStartAfter();
-
- ResumeToken resumeToken;
- if (resumeAfter || startAfter) {
- resumeToken = resumeAfter ? resumeAfter.get() : startAfter.get();
- ResumeTokenData tokenData = resumeToken.getData();
-
- if (!tokenData.documentKey.missing() && tokenData.uuid) {
- std::vector<FieldPath> docKeyFields;
- auto docKey = tokenData.documentKey.getDocument();
-
- auto iter = docKey.fieldIterator();
- while (iter.more()) {
- auto fieldPair = iter.next();
- docKeyFields.push_back(fieldPair.first);
- }
+ if (!tokenData.documentKey.missing() && tokenData.uuid) {
+ std::vector<FieldPath> docKeyFields;
+ auto docKey = tokenData.documentKey.getDocument();
+
+ auto iter = docKey.fieldIterator();
+ while (iter.more()) {
+ auto fieldPair = iter.next();
+ docKeyFields.push_back(fieldPair.first);
+ }
- // If the document key from the resume token has more than one field, that means it
- // includes the shard key and thus should never change.
- const bool isFinal = docKeyFields.size() > 1;
+ // If the document key from the resume token has more than one field, that means it
+ // includes the shard key and thus should never change.
+ const bool isFinal = docKeyFields.size() > 1;
- _documentKeyCache[tokenData.uuid.get()] =
- DocumentKeyCacheEntry({docKeyFields, isFinal});
- }
- } else if (auto startAtOperationTime = _changeStreamSpec.getStartAtOperationTime()) {
- // TODO SERVER-56669: Move this change to populate resume token in 'ChangeStreamSpec' into
- // DocumentSourceChangeStream::create().
- resumeToken = ResumeToken::makeHighWaterMarkToken(*startAtOperationTime);
- } else {
- // If we do not have an explicit starting point, we should start from the latest majority
- // committed operation. If we are on mongoS and do not have a starting point, set it to the
- // current clusterTime so that all shards start in sync. We always start one tick beyond the
- // most recent operation, to ensure that the stream does not return it.
- auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
- const auto currentTime = !expCtx->inMongos
- ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()}
- : [&] {
- const auto currentTime = VectorClock::get(expCtx->opCtx)->getTime();
- return currentTime.clusterTime();
- }();
-
- // If we haven't already populated the initial PBRT, then we are starting from a specific
- // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
- const auto startAtTime = currentTime.addTicks(1).asTimestamp();
- resumeToken = ResumeToken::makeHighWaterMarkToken(startAtTime);
-
- // Make sure we update the 'resumeAfter' in the '_changeStreamSpec' so that we serialize the
- // correct resume token when sending it to the shards.
- _changeStreamSpec.setResumeAfter(resumeToken);
+ _documentKeyCache[tokenData.uuid.get()] = DocumentKeyCacheEntry({docKeyFields, isFinal});
}
- expCtx->initialPostBatchResumeToken = resumeToken.toDocument().toBson();
}
StageConstraints DocumentSourceChangeStreamTransform::constraints(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 24adaf8d5c7..46fe08b7a49 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -41,6 +41,10 @@ public:
/**
* Creates a new transformation stage from the given specification.
*/
+ static boost::intrusive_ptr<DocumentSourceChangeStreamTransform> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
+
static boost::intrusive_ptr<DocumentSourceChangeStreamTransform> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
@@ -69,8 +73,8 @@ protected:
private:
// This constructor is private, callers should use the 'create()' method above.
- DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>&,
- BSONObj changeStreamSpec);
+ DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ DocumentSourceChangeStreamSpec spec);
struct DocumentKeyCacheEntry {
DocumentKeyCacheEntry() = default;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index f81f1880616..9c470320d95 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -312,10 +312,13 @@ protected:
/**
* Convenience method to create the class under test with a given ResumeTokenData.
*/
+
intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
createDSEnsureResumeTokenPresent(ResumeTokenData tokenData) {
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAfter(ResumeToken(tokenData));
auto checkResumeToken =
- DocumentSourceChangeStreamEnsureResumeTokenPresent::create(getExpCtx(), tokenData);
+ DocumentSourceChangeStreamEnsureResumeTokenPresent::create(getExpCtx(), spec);
_mock->setResumeToken(std::move(tokenData));
checkResumeToken->setSource(_mock.get());
return checkResumeToken;
@@ -371,8 +374,10 @@ class CheckResumabilityTest : public CheckResumeTokenTest {
protected:
intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability(
ResumeTokenData tokenData) {
+ DocumentSourceChangeStreamSpec spec;
+ spec.setStartAfter(ResumeToken(tokenData));
auto dsCheckResumability =
- DocumentSourceChangeStreamCheckResumability::create(getExpCtx(), tokenData);
+ DocumentSourceChangeStreamCheckResumability::create(getExpCtx(), spec);
_mock->setResumeToken(std::move(tokenData));
dsCheckResumability->setSource(_mock.get());
return dsCheckResumability;
@@ -497,8 +502,8 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) {
// Verify that a resume token whose documentKey only contains _id can be used to resume a stream
// on a sharded collection as long as its _id matches the first document. We set 'inMongos'
- // since this behaviour is only applicable when
- // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS.
+ // since this behaviour is only applicable when DSCSEnsureResumeTokenPresent is running on
+ // mongoS.
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
@@ -537,8 +542,8 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes
TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumentKeyFields) {
// Verify that the relaxed _id check only applies if _id is the sole field present in the
// client's resume token, even if all the fields that are present match the first doc. We set
- // 'inMongos' since this is only applicable when
- // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS.
+ // 'inMongos' since this is only applicable when DSCSEnsureResumeTokenPresent is running on
+ // mongoS.
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
@@ -980,8 +985,7 @@ TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) {
TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event
- // ResumeToken.
+ // Set up the DSCSCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
auto dsCheckResumability = createDSCheckResumability(token);
@@ -1010,8 +1014,7 @@ TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeT
TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) {
Timestamp resumeTimestamp(100, 2);
- // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event
- // ResumeToken.
+ // Set up the DSCSCheckResumability to check for an exact event ResumeToken.
ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}}));
auto dsCheckResumability = createDSCheckResumability(token);
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 8f495bcb756..921e9c0d7a1 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -37,7 +37,7 @@
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
-#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h"
+#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"