diff options
Diffstat (limited to 'src/mongo/db/pipeline')
22 files changed, 351 insertions, 269 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 9c167fb3b3b..dea32771712 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -343,15 +343,15 @@ env.Library( 'document_source_change_stream.cpp', 'document_source_change_stream_check_invalidate.cpp', 'document_source_change_stream_check_resumability.cpp', + 'document_source_change_stream_check_topology_change.cpp', 'document_source_change_stream_close_cursor.cpp', 'document_source_change_stream_ensure_resume_token_present.cpp', + 'document_source_change_stream_handle_topology_change.cpp', 'document_source_change_stream_lookup_post_image.cpp', 'document_source_change_stream_lookup_pre_image.cpp', 'document_source_change_stream_oplog_match.cpp', - 'document_source_change_stream_topology_change.cpp', 'document_source_change_stream_transform.cpp', 'document_source_change_stream_unwind_transactions.cpp', - 'document_source_change_stream_update_on_add_shard.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/pipeline/pipeline', diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 65baaeb3e6b..0fff6070ba3 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -33,144 +33,79 @@ #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" +#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" #include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" +#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" #include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" -#include "mongo/db/pipeline/document_source_change_stream_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h" -#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" #include "mongo/db/pipeline/expression.h" namespace mongo { namespace change_stream_legacy { std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceChangeStreamSpec spec, - BSONElement rawSpec) { + const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) { std::list<boost::intrusive_ptr<DocumentSource>> stages; - boost::intrusive_ptr<DocumentSource> resumeStage = nullptr; - boost::optional<ResumeTokenData> startAfterInvalidate; - bool showMigrationEvents = spec.getShowMigrationEvents(); - uassert(31123, - "Change streams from mongos may not show migration events.", - !(expCtx->inMongos && showMigrationEvents)); - - auto resumeAfter = spec.getResumeAfter(); - auto startAfter = spec.getStartAfter(); - if (resumeAfter || startAfter) { - uassert(50865, - "Do not specify both 'resumeAfter' and 'startAfter' in a $changeStream stage", - !startAfter || !resumeAfter); - - const ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get(); - const ResumeTokenData tokenData = token.getData(); - - // If resuming from an "invalidate" using "startAfter", pass along the resume token data to - // DocumentSourceChangeStreamCheckInvalidate to signify that another invalidate should not - // be generated. - if (startAfter && tokenData.fromInvalidate) { - startAfterInvalidate = tokenData; - } - uassert(ErrorCodes::InvalidResumeToken, - "Attempting to resume a change stream using 'resumeAfter' is not allowed from an " - "invalidate notification.", - !resumeAfter || !tokenData.fromInvalidate); - - // If we are resuming a single-collection stream, the resume token should always contain a - // UUID unless the token is a high water mark. - uassert(ErrorCodes::InvalidResumeToken, - "Attempted to resume a single-collection stream, but the resume token does not " - "include a UUID.", - tokenData.uuid || !expCtx->isSingleNamespaceAggregation() || - ResumeToken::isHighWaterMarkToken(tokenData)); - - // For a regular resume token, we must ensure that (1) all shards are capable of resuming - // from the given clusterTime, and (2) that we observe the resume token event in the stream - // before any event that would sort after it. High water mark tokens, however, do not refer - // to a specific event; we thus only need to check (1), similar to 'startAtOperationTime'. - if (expCtx->needsMerge || ResumeToken::isHighWaterMarkToken(tokenData)) { - resumeStage = DocumentSourceChangeStreamCheckResumability::create(expCtx, tokenData); - } else { - resumeStage = - DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, tokenData); - } - } + const auto userRequestedResumePoint = + spec.getResumeAfter() || spec.getStartAfter() || spec.getStartAtOperationTime(); - // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'. - if (auto startAtOperationTime = spec.getStartAtOperationTime()) { - uassert(40674, - "Only one type of resume option is allowed, but multiple were found.", - !resumeStage); - resumeStage = - DocumentSourceChangeStreamCheckResumability::create(expCtx, *startAtOperationTime); + if (!userRequestedResumePoint) { + // Make sure we update the 'resumeAfter' in the 'spec' so that we serialize the + // correct resume token when sending it to the shards. + spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken( + DocumentSourceChangeStream::getStartTimeForNewStream(expCtx))); } - auto transformStage = DocumentSourceChangeStreamTransform::createFromBson(rawSpec, expCtx); + // Unfold the $changeStream into its constituent stages and add them to the pipeline. + stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec)); + stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, spec)); tassert(5467606, "'DocumentSourceChangeStreamTransform' stage should populate " "'initialPostBatchResumeToken' field", !expCtx->initialPostBatchResumeToken.isEmpty()); - // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies - // upon the fact that it is always the first stage in the pipeline. - stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, showMigrationEvents)); - - stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx)); - stages.push_back(transformStage); - - const bool csOptFeatureFlag = - feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV(); - - // The 'DocumentSourceChangeStreamTopologyChange' only runs in a cluster, and will be dispatched - // by mongoS to the shards. - if (csOptFeatureFlag && expCtx->inMongos) { - stages.push_back(DocumentSourceChangeStreamTopologyChange::create(expCtx)); - } - // The resume stage must come after the check invalidate stage so that the former can determine // whether the event that matches the resume token should be followed by an "invalidate" event. - stages.push_back( - DocumentSourceChangeStreamCheckInvalidate::create(expCtx, startAfterInvalidate)); - - // The resume stage 'DocumentSourceChangeStreamCheckResumability' should come before the split - // point stage 'DocumentSourceChangeStreamUpdateOnAddShard'. - if (resumeStage && - resumeStage->getSourceName() == DocumentSourceChangeStreamCheckResumability::kStageName) { - stages.push_back(resumeStage); - resumeStage.reset(); + stages.push_back(DocumentSourceChangeStreamCheckInvalidate::create(expCtx, spec)); + + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + + // If the user-requested resume point is a high water mark, or if we are running on the shards + // in a cluster, we must include a DSCSCheckResumability stage. + if (expCtx->needsMerge || + (userRequestedResumePoint && ResumeToken::isHighWaterMarkToken(resumeToken))) { + stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec)); } - // If the pipeline is build on MongoS, then the stage - // 'DocumentSourceChangeStreamUpdateOnAddShard' acts as the split point for the pipline. All - // stages before this stages will run on shards and all stages after and inclusive of this stage - // will run on the MongoS. + // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as + // the split point for the pipline. All stages before this stages will run on shards and all + // stages after and inclusive of this stage will run on the MongoS. if (expCtx->inMongos) { - stages.push_back(DocumentSourceChangeStreamUpdateOnAddShard::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx)); } - // This resume stage should be 'DocumentSourceChangeStreamEnsureResumeTokenPresent'. - if (resumeStage) { - stages.push_back(resumeStage); + // If the resume token is from an event, we must include a DSCSEnsureResumeTokenPresent stage. + // In a cluster, this will be on mongoS and should not be generated on the shards. + if (!expCtx->needsMerge && !ResumeToken::isHighWaterMarkToken(resumeToken)) { + stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec)); } if (!expCtx->needsMerge) { - if (!csOptFeatureFlag) { - // There should only be one close cursor stage. If we're on the shards and producing - // input to be merged, do not add a close cursor stage, since the mongos will already - // have one. - stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx)); - } - - // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here - // so that any $match stages which follow the $changeStream pipeline prefix may be able to - // skip ahead of the DSLPreImage stage. This allows a whole-db or whole-cluster stream to - // run on an instance where only some collections have pre-images enabled, so long as the - // user filters for only those namespaces. + // There should only be one close cursor stage. If we're on the shards and producing input + // to be merged, do not add a close cursor stage, since the mongos will already have one. + stages.push_back(DocumentSourceChangeStreamCloseCursor::create(expCtx)); + + // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage + // here so that any $match stages which follow the $changeStream pipeline prefix may be + // able to skip ahead of the DSCSLookupPreImage stage. This allows a whole-db or + // whole-cluster stream to run on an instance where only some collections have pre-images + // enabled, so long as the user filters for only those namespaces. // TODO SERVER-36941: figure out how to get this to work in a sharded cluster. if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) { invariant(!expCtx->inMongos); @@ -240,7 +175,7 @@ Value DocumentSourceChangeStreamLookupPostImage::serializeLegacy( return (explain ? Value{Document{{kStageName, Document()}}} : Value()); } -Value DocumentSourceChangeStreamTopologyChange::serializeLegacy( +Value DocumentSourceChangeStreamCheckTopologyChange::serializeLegacy( boost::optional<ExplainOptions::Verbosity> explain) const { return (explain ? Value{Document{{kStageName, Document()}}} : Value()); } diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h index e6c29e0ec90..4c790782a20 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h @@ -38,7 +38,6 @@ namespace mongo::change_stream_legacy { */ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, - const DocumentSourceChangeStreamSpec spec, - BSONElement rawSpec); + const DocumentSourceChangeStreamSpec spec); } // namespace mongo::change_stream_legacy diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 94c6f58adf0..c4de7837030 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -40,12 +40,15 @@ #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_check_invalidate.h" #include "mongo/db/pipeline/document_source_change_stream_check_resumability.h" +#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_close_cursor.h" +#include "mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h" +#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_post_image.h" #include "mongo/db/pipeline/document_source_change_stream_lookup_pre_image.h" +#include "mongo/db/pipeline/document_source_change_stream_oplog_match.h" #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transactions.h" -#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" @@ -151,10 +154,41 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac } } +ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec( + const DocumentSourceChangeStreamSpec& spec) { + if (spec.getStartAfter()) { + return spec.getStartAfter()->getData(); + } else if (spec.getResumeAfter()) { + return spec.getResumeAfter()->getData(); + } else if (spec.getStartAtOperationTime()) { + return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime()).getData(); + } + tasserted(5666901, + "Expected one of 'startAfter', 'resumeAfter' or 'startAtOperationTime' to be " + "populated in $changeStream spec"); +} + +Timestamp DocumentSourceChangeStream::getStartTimeForNewStream( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + // If we do not have an explicit starting point, we should start from the latest majority + // committed operation. If we are on mongoS and do not have a starting point, set it to the + // current clusterTime so that all shards start in sync. + auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); + const auto currentTime = + !expCtx->inMongos ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} : [&] { + const auto currentTime = VectorClock::get(expCtx->opCtx)->getTime(); + return currentTime.clusterTime(); + }(); + + // We always start one tick beyond the most recent operation, to ensure that the stream does not + // return it. + return currentTime.addTicks(1).asTimestamp(); +} + list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { uassert(50808, - "$changeStream stage expects a document as argument.", + "$changeStream stage expects a document as argument", elem.type() == BSONType::Object); auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), @@ -163,7 +197,79 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // Make sure that it is legal to run this $changeStream before proceeding. DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec); - return change_stream_legacy::buildPipeline(expCtx, spec, elem); + // If we see this stage on a shard, it means that the raw $changeStream stage was dispatched to + // us from an old mongoS. Build a legacy shard pipeline. + if (expCtx->needsMerge || + !feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) { + return change_stream_legacy::buildPipeline(expCtx, spec); + } + return _buildPipeline(expCtx, spec); +} + +std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_buildPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) { + std::list<boost::intrusive_ptr<DocumentSource>> stages; + + // If the user did not specify an explicit starting point, set it to the current time. + if (!spec.getResumeAfter() && !spec.getStartAfter() && !spec.getStartAtOperationTime()) { + // Make sure we update the 'startAtOperationTime' in the 'spec' so that we serialize the + // correct start point when sending it to the shards. + spec.setStartAtOperationTime(DocumentSourceChangeStream::getStartTimeForNewStream(expCtx)); + } + + // Obtain the resume token from the spec. This will be used when building the pipeline. + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + + // Unfold the $changeStream into its constituent stages and add them to the pipeline. + stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec)); + stages.push_back(DocumentSourceChangeStreamUnwindTransaction::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, spec)); + tassert(5666900, + "'DocumentSourceChangeStreamTransform' stage should populate " + "'initialPostBatchResumeToken' field", + !expCtx->initialPostBatchResumeToken.isEmpty()); + + // The resume stage must come after the check invalidate stage so that the former can determine + // whether the event that matches the resume token should be followed by an "invalidate" event. + stages.push_back(DocumentSourceChangeStreamCheckInvalidate::create(expCtx, spec)); + + // If the starting point is a high water mark, or if we will be splitting the pipeline for + // dispatch to the shards in a cluster, we must include a DSCSCheckResumability stage. + if (expCtx->inMongos || ResumeToken::isHighWaterMarkToken(resumeToken)) { + stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec)); + } + + // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as the + // split point for the pipline. All stages before this stages will run on shards and all stages + // after and inclusive of this stage will run on the MongoS. + if (expCtx->inMongos) { + stages.push_back(DocumentSourceChangeStreamCheckTopologyChange::create(expCtx)); + stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx)); + } + + // If the resume point is an event, we must include a DSCSEnsureResumeTokenPresent stage. + if (!ResumeToken::isHighWaterMarkToken(resumeToken)) { + stages.push_back(DocumentSourceChangeStreamEnsureResumeTokenPresent::create(expCtx, spec)); + } + + // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here + // so that any $match stages which follow the $changeStream pipeline prefix may be able to + // skip ahead of the DSCSLookupPreImage stage. This allows a whole-db or whole-cluster stream to + // run on an instance where only some collections have pre-images enabled, so long as the + // user filters for only those namespaces. + // TODO SERVER-36941: figure out how to get this to work in a sharded cluster. + if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) { + invariant(!expCtx->inMongos); + stages.push_back(DocumentSourceChangeStreamLookupPreImage::create(expCtx, spec)); + } + + // There should be only one post-image lookup stage. If we're on the shards and producing + // input to be merged, the lookup is done on the mongos. + if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) { + stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx)); + } + + return stages; } void DocumentSourceChangeStream::assertIsLegalSpecification( @@ -212,6 +318,35 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( uassert(51771, "the 'fullDocumentBeforeChange' option is not supported in a sharded cluster", !(shouldLookupPreImage && (expCtx->inMongos || expCtx->needsMerge))); + + uassert(31123, + "Change streams from mongos may not show migration events", + !(expCtx->inMongos && spec.getShowMigrationEvents())); + + uassert(50865, + "Do not specify both 'resumeAfter' and 'startAfter' in a $changeStream stage", + !spec.getResumeAfter() || !spec.getStartAfter()); + + auto resumeToken = (spec.getResumeAfter() || spec.getStartAfter()) + ? resolveResumeTokenFromSpec(spec) + : boost::optional<ResumeTokenData>(); + + uassert(40674, + "Only one type of resume option is allowed, but multiple were found", + !(spec.getStartAtOperationTime() && resumeToken)); + + uassert(ErrorCodes::InvalidResumeToken, + "Attempting to resume a change stream using 'resumeAfter' is not allowed from an " + "invalidate notification", + !(spec.getResumeAfter() && resumeToken->fromInvalidate)); + + // If we are resuming a single-collection stream, the resume token should always contain a + // UUID unless the token is a high water mark. + uassert(ErrorCodes::InvalidResumeToken, + "Attempted to resume a single-collection stream, but the resume token does not " + "include a UUID", + !resumeToken || resumeToken->uuid || !expCtx->isSingleNamespaceAggregation() || + ResumeToken::isHighWaterMarkToken(*resumeToken)); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 144dddf84d4..29b67fe5262 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -187,7 +187,25 @@ public: */ static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType); + /** + * Extracts the resume token from the given spec. If a 'startAtOperationTime' is specified, + * returns the equivalent high-watermark token. This method should only ever be called on a spec + * where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated. + */ + static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec); + + /** + * For a change stream with no resume information supplied by the user, returns the clusterTime + * at which the new stream should begin scanning the oplog. + */ + static Timestamp getStartTimeForNewStream( + const boost::intrusive_ptr<ExpressionContext>& expCtx); + private: + // Constructs and returns a series of stages representing the full change stream pipeline. + static std::list<boost::intrusive_ptr<DocumentSource>> _buildPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec); + // Helper function which throws if the $changeStream fails any of a series of semantic checks. // For instance, whether it is permitted to run given the current FCV, whether the namespace is // valid for the options specified in the spec, etc. diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp index a5ee091e326..10b60f55307 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp @@ -66,6 +66,17 @@ bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCt } // namespace boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> +DocumentSourceChangeStreamCheckInvalidate::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) { + // If resuming from an "invalidate" using "startAfter", pass along the resume token data to + // DSCSCheckInvalidate to signify that another invalidate should not be generated. + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + return new DocumentSourceChangeStreamCheckInvalidate( + expCtx, boost::make_optional(resumeToken.fromInvalidate, std::move(resumeToken))); +} + +boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> DocumentSourceChangeStreamCheckInvalidate::createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467602, @@ -77,9 +88,8 @@ DocumentSourceChangeStreamCheckInvalidate::createFromBson( spec.embeddedObject()); return new DocumentSourceChangeStreamCheckInvalidate( expCtx, - parsed.getStartAfterInvalidate() - ? boost::optional<ResumeTokenData>(parsed.getStartAfterInvalidate()->getData()) - : boost::none); + parsed.getStartAfterInvalidate() ? parsed.getStartAfterInvalidate()->getData() + : boost::optional<ResumeTokenData>()); } DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNext() { diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h index 92647a6b48d..30dc1b6eeda 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.h @@ -72,12 +72,10 @@ public: static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx); + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckInvalidate> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, - boost::optional<ResumeTokenData> startAfterInvalidate) { - return new DocumentSourceChangeStreamCheckInvalidate(expCtx, - std::move(startAfterInvalidate)); - } + const DocumentSourceChangeStreamSpec& spec); private: /** diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index ffb7ff6cd72..c22ea8d5c7d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -179,15 +179,23 @@ DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResu intrusive_ptr<DocumentSourceChangeStreamCheckResumability> DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx, - Timestamp ts) { - // We are resuming from a point in time, not an event. Seed the stage with a high water mark. - return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts).getData()); + const DocumentSourceChangeStreamSpec& spec) { + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(resumeToken)); } intrusive_ptr<DocumentSourceChangeStreamCheckResumability> -DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx, - ResumeTokenData token) { - return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(token)); +DocumentSourceChangeStreamCheckResumability::createFromBson( + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(5467603, + str::stream() << "the '" << kStageName << "' object spec must be an object", + spec.type() == Object); + + auto parsed = DocumentSourceChangeStreamCheckResumabilitySpec::parse( + IDLParserErrorContext("DocumentSourceChangeStreamCheckResumabilitySpec"), + spec.embeddedObject()); + return new DocumentSourceChangeStreamCheckResumability(expCtx, + parsed.getResumeToken().getData()); } const char* DocumentSourceChangeStreamCheckResumability::getSourceName() const { diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h index 9cbed08490a..21fad097655 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.h @@ -95,22 +95,11 @@ public: } static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createFromBson( - BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - uassert(5467603, - str::stream() << "the '" << kStageName << "' object spec must be an object", - spec.type() == Object); - - auto parsed = DocumentSourceChangeStreamCheckResumabilitySpec::parse( - IDLParserErrorContext("DocumentSourceChangeStreamCheckResumabilitySpec"), - spec.embeddedObject()); - return create(expCtx, parsed.getResumeToken().getData()); - } + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx); static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts); - - static boost::intrusive_ptr<DocumentSourceChangeStreamCheckResumability> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); protected: /** diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp index ce46192ac4a..724f6c1338f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp @@ -29,19 +29,19 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/document_source_change_stream_topology_change.h" +#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" #include "mongo/db/pipeline/change_stream_topology_change_info.h" namespace mongo { REGISTER_INTERNAL_DOCUMENT_SOURCE( - _internalChangeStreamTopologyChange, + _internalChangeStreamCheckTopologyChange, LiteParsedDocumentSourceChangeStreamInternal::parse, - DocumentSourceChangeStreamTopologyChange::createFromBson, + DocumentSourceChangeStreamCheckTopologyChange::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); -StageConstraints DocumentSourceChangeStreamTopologyChange::constraints( +StageConstraints DocumentSourceChangeStreamCheckTopologyChange::constraints( Pipeline::SplitState pipeState) const { return {StreamType::kStreaming, PositionRequirement::kNone, @@ -55,16 +55,16 @@ StageConstraints DocumentSourceChangeStreamTopologyChange::constraints( } -boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> -DocumentSourceChangeStreamTopologyChange::createFromBson( +boost::intrusive_ptr<DocumentSourceChangeStreamCheckTopologyChange> +DocumentSourceChangeStreamCheckTopologyChange::createFromBson( const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5669601, str::stream() << "the '" << kStageName << "' spec must be an object", elem.type() == Object && elem.Obj().isEmpty()); - return new DocumentSourceChangeStreamTopologyChange(expCtx); + return new DocumentSourceChangeStreamCheckTopologyChange(expCtx); } -DocumentSource::GetNextResult DocumentSourceChangeStreamTopologyChange::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doGetNext() { auto nextInput = pSource->getNext(); if (!nextInput.isAdvanced()) { @@ -87,7 +87,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTopologyChange::doGetNex return nextInput; } -Value DocumentSourceChangeStreamTopologyChange::serializeLatest( +Value DocumentSourceChangeStreamCheckTopologyChange::serializeLatest( boost::optional<ExplainOptions::Verbosity> explain) const { if (explain) { return Value(DOC(DocumentSourceChangeStream::kStageName diff --git a/src/mongo/db/pipeline/document_source_change_stream_topology_change.h b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h index 8e03f75ca15..850130359e7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_topology_change.h +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.h @@ -45,18 +45,18 @@ namespace mongo { * that previously may not have held any data for the collection being watched, and they contain the * information necessary for the mongoS to include the new shard in the merged change stream. */ -class DocumentSourceChangeStreamTopologyChange final +class DocumentSourceChangeStreamCheckTopologyChange final : public DocumentSource, public ChangeStreamStageSerializationInterface { public: - static constexpr StringData kStageName = "$_internalChangeStreamTopologyChange"_sd; + static constexpr StringData kStageName = "$_internalChangeStreamCheckTopologyChange"_sd; - static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> createFromBson( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckTopologyChange> createFromBson( const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - static boost::intrusive_ptr<DocumentSourceChangeStreamTopologyChange> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamCheckTopologyChange> create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceChangeStreamTopologyChange(expCtx); + return new DocumentSourceChangeStreamCheckTopologyChange(expCtx); } const char* getSourceName() const final { @@ -74,7 +74,8 @@ public: } private: - DocumentSourceChangeStreamTopologyChange(const boost::intrusive_ptr<ExpressionContext>& expCtx) + DocumentSourceChangeStreamCheckTopologyChange( + const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(kStageName, expCtx) {} GetNextResult doGetNext() final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp index cb6dfc0ede1..7ea37e67da8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp @@ -40,8 +40,13 @@ DocumentSourceChangeStreamEnsureResumeTokenPresent:: boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> DocumentSourceChangeStreamEnsureResumeTokenPresent::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) { - return new DocumentSourceChangeStreamEnsureResumeTokenPresent(expCtx, std::move(token)); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) { + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + tassert(5666902, + "Expected non-high-water-mark resume token", + !ResumeToken::isHighWaterMarkToken(resumeToken)); + return new DocumentSourceChangeStreamEnsureResumeTokenPresent(expCtx, std::move(resumeToken)); } const char* DocumentSourceChangeStreamEnsureResumeTokenPresent::getSourceName() const { diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h index b380f72d643..57b91107913 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.h @@ -58,7 +58,8 @@ public: } static boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 48a3e7db187..962e48fe4b8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -27,7 +27,7 @@ * it in the license file. */ -#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" +#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" #include <algorithm> @@ -77,17 +77,17 @@ bool isShardConfigEvent(const Document& eventDoc) { } } // namespace -boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard> -DocumentSourceChangeStreamUpdateOnAddShard::create( +boost::intrusive_ptr<DocumentSourceChangeStreamHandleTopologyChange> +DocumentSourceChangeStreamHandleTopologyChange::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceChangeStreamUpdateOnAddShard(expCtx); + return new DocumentSourceChangeStreamHandleTopologyChange(expCtx); } -DocumentSourceChangeStreamUpdateOnAddShard::DocumentSourceChangeStreamUpdateOnAddShard( +DocumentSourceChangeStreamHandleTopologyChange::DocumentSourceChangeStreamHandleTopologyChange( const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(kStageName, expCtx) {} -DocumentSource::GetNextResult DocumentSourceChangeStreamUpdateOnAddShard::doGetNext() { +DocumentSource::GetNextResult DocumentSourceChangeStreamHandleTopologyChange::doGetNext() { // For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be // populated. We also resolve the original aggregation command from the expression context. if (!_mergeCursors) { @@ -113,13 +113,13 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamUpdateOnAddShard::doGetN return childResult; } -void DocumentSourceChangeStreamUpdateOnAddShard::addNewShardCursors( +void DocumentSourceChangeStreamHandleTopologyChange::addNewShardCursors( const Document& newShardDetectedObj) { _mergeCursors->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj)); } std::vector<RemoteCursor> -DocumentSourceChangeStreamUpdateOnAddShard::establishShardCursorsOnNewShards( +DocumentSourceChangeStreamHandleTopologyChange::establishShardCursorsOnNewShards( const Document& newShardDetectedObj) { // Reload the shard registry to see the new shard. auto* opCtx = pExpCtx->opCtx; @@ -146,7 +146,7 @@ DocumentSourceChangeStreamUpdateOnAddShard::establishShardCursorsOnNewShards( allowPartialResults); } -BSONObj DocumentSourceChangeStreamUpdateOnAddShard::createUpdatedCommandForNewShard( +BSONObj DocumentSourceChangeStreamHandleTopologyChange::createUpdatedCommandForNewShard( Timestamp shardAddedTime) { // We must start the new cursor from the moment at which the shard became visible. const auto newShardAddedTime = LogicalTime{shardAddedTime}; @@ -178,7 +178,7 @@ BSONObj DocumentSourceChangeStreamUpdateOnAddShard::createUpdatedCommandForNewSh true /* needsMerge */); } -BSONObj DocumentSourceChangeStreamUpdateOnAddShard::replaceResumeTokenInCommand( +BSONObj DocumentSourceChangeStreamHandleTopologyChange::replaceResumeTokenInCommand( Document resumeToken) { Document originalCmd(_originalAggregateCommand); auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray(); diff --git a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h index 4270ff8b1ec..1d240be5e3c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.h @@ -45,15 +45,15 @@ namespace mongo { * the first time. When this event is detected, this stage will establish a new cursor on that * shard and add it to the cursors being merged. */ -class DocumentSourceChangeStreamUpdateOnAddShard final : public DocumentSource { +class DocumentSourceChangeStreamHandleTopologyChange final : public DocumentSource { public: - static constexpr StringData kStageName = "$_internalUpdateOnAddShard"_sd; + static constexpr StringData kStageName = "$_internalChangeStreamHandleTopologyChange"_sd; /** * Creates a new stage which will establish a new cursor and add it to the cursors being merged * by 'mergeCursorsStage' whenever a new shard is detected by a change stream. */ - static boost::intrusive_ptr<DocumentSourceChangeStreamUpdateOnAddShard> create( + static boost::intrusive_ptr<DocumentSourceChangeStreamHandleTopologyChange> create( const boost::intrusive_ptr<ExpressionContext>&); Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { @@ -77,7 +77,7 @@ public: } private: - DocumentSourceChangeStreamUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&); + DocumentSourceChangeStreamHandleTopologyChange(const boost::intrusive_ptr<ExpressionContext>&); GetNextResult doGetNext() final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index 6f5e55cd47d..5dd28772c6c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -189,20 +189,14 @@ BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive) << BSON(OR(opMatch, commandAndApplyOpsMatch)))); } - } // namespace boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx, - bool showMigrationEvents) { - // TODO SERVER-56669: ensure that 'initialPostBatchResumeToken' is always populated at this - // point. + const DocumentSourceChangeStreamSpec& spec) { + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); return make_intrusive<DocumentSourceChangeStreamOplogMatch>( - buildMatchFilter( - expCtx, - ResumeToken::parse(expCtx->initialPostBatchResumeToken).getData().clusterTime, - showMigrationEvents), - expCtx); + buildMatchFilter(expCtx, resumeToken.clusterTime, spec.getShowMigrationEvents()), expCtx); } boost::intrusive_ptr<DocumentSource> DocumentSourceChangeStreamOplogMatch::createFromBson( diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h index 06d6602ca49..68755f811b9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.h @@ -57,8 +57,10 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + static boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx, bool showMigrationEvents); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); const char* getSourceName() const final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index df6fbb1e5d6..a0902b70e3c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -2094,9 +2094,9 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, DSCSTransformStageEmptySpecSerializeResumeAfter) { auto expCtx = getExpCtx(); - const auto serializedStageName = getCSOptimizationFeatureFlagValue() - ? DocumentSourceChangeStreamTransform::kStageName - : DSChangeStream::kStageName; + auto featureFlag = getCSOptimizationFeatureFlagValue(); + const auto serializedStageName = + featureFlag ? DocumentSourceChangeStreamTransform::kStageName : DSChangeStream::kStageName; auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj()); @@ -2107,18 +2107,25 @@ TEST_F(ChangeStreamStageWithDualFeatureFlagValueTest, expCtx->initialPostBatchResumeToken = BSONObj(); }); - auto stage = - DocumentSourceChangeStreamTransform::createFromBson(originalSpec.firstElement(), expCtx); + auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx); ASSERT(!expCtx->initialPostBatchResumeToken.isEmpty()); - // Verify that an additional 'startAtOperationTime' is populated while serializing. + vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); + ASSERT_EQ(allStages.size(), 5); + auto transformStage = allStages[2]; + ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(transformStage.get())); + + + // Verify that an additional start point field is populated while serializing. vector<Value> serialization; - stage->serializeToArray(serialization); + transformStage->serializeToArray(serialization); ASSERT_EQ(serialization.size(), 1UL); ASSERT_EQ(serialization[0].getType(), BSONType::Object); ASSERT(!serialization[0] .getDocument()[serializedStageName] - .getDocument()[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] + .getDocument()[featureFlag + ? DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName + : DocumentSourceChangeStreamSpec::kResumeAfterFieldName] .missing()); } @@ -2448,14 +2455,13 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) { BSON("x" << 2 << "_id" << 1), ResumeTokenData::FromInvalidate::kFromInvalidate); - ASSERT_THROWS_CODE(DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName - << BSON("resumeAfter" << resumeTokenInvalidate - << "startAtOperationTime" << kDefaultTs)) - .firstElement(), - expCtx), - AssertionException, - ErrorCodes::InvalidResumeToken); + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeTokenInvalidate)) + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::InvalidResumeToken); } TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 0f62b87ec4f..229b9229565 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -47,11 +47,9 @@ #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" -#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/update/update_oplog_entry_version.h" -#include "mongo/db/vector_clock.h" namespace mongo { @@ -71,81 +69,57 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE( DocumentSourceChangeStreamTransform::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) { + return new DocumentSourceChangeStreamTransform(expCtx, spec); +} + intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::createFromBson( - BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + BSONElement rawSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467601, "the '$_internalChangeStreamTransform' object spec must be an object", - spec.type() == BSONType::Object); - - return new DocumentSourceChangeStreamTransform(expCtx, spec.Obj()); + rawSpec.type() == BSONType::Object); + auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), + rawSpec.Obj()); + return new DocumentSourceChangeStreamTransform(expCtx, std::move(spec)); } DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( - const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec) + const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) : DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx), - _changeStreamSpec(DocumentSourceChangeStreamSpec::parse( - IDLParserErrorContext("$changeStream"), changeStreamSpec)), + _changeStreamSpec(std::move(spec)), _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) { // If the change stream spec requested a pre-image, make sure that we supply one. _includePreImageOptime = (_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff); + // Extract the resume token or high-water-mark from the spec. + auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); + + // Set the initialPostBatchResumeToken on the expression context. + expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON(); + // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. - auto resumeAfter = _changeStreamSpec.getResumeAfter(); - auto startAfter = _changeStreamSpec.getStartAfter(); - - ResumeToken resumeToken; - if (resumeAfter || startAfter) { - resumeToken = resumeAfter ? resumeAfter.get() : startAfter.get(); - ResumeTokenData tokenData = resumeToken.getData(); - - if (!tokenData.documentKey.missing() && tokenData.uuid) { - std::vector<FieldPath> docKeyFields; - auto docKey = tokenData.documentKey.getDocument(); - - auto iter = docKey.fieldIterator(); - while (iter.more()) { - auto fieldPair = iter.next(); - docKeyFields.push_back(fieldPair.first); - } + if (!tokenData.documentKey.missing() && tokenData.uuid) { + std::vector<FieldPath> docKeyFields; + auto docKey = tokenData.documentKey.getDocument(); + + auto iter = docKey.fieldIterator(); + while (iter.more()) { + auto fieldPair = iter.next(); + docKeyFields.push_back(fieldPair.first); + } - // If the document key from the resume token has more than one field, that means it - // includes the shard key and thus should never change. - const bool isFinal = docKeyFields.size() > 1; + // If the document key from the resume token has more than one field, that means it + // includes the shard key and thus should never change. + const bool isFinal = docKeyFields.size() > 1; - _documentKeyCache[tokenData.uuid.get()] = - DocumentKeyCacheEntry({docKeyFields, isFinal}); - } - } else if (auto startAtOperationTime = _changeStreamSpec.getStartAtOperationTime()) { - // TODO SERVER-56669: Move this change to populate resume token in 'ChangeStreamSpec' into - // DocumentSourceChangeStream::create(). - resumeToken = ResumeToken::makeHighWaterMarkToken(*startAtOperationTime); - } else { - // If we do not have an explicit starting point, we should start from the latest majority - // committed operation. If we are on mongoS and do not have a starting point, set it to the - // current clusterTime so that all shards start in sync. We always start one tick beyond the - // most recent operation, to ensure that the stream does not return it. - auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); - const auto currentTime = !expCtx->inMongos - ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} - : [&] { - const auto currentTime = VectorClock::get(expCtx->opCtx)->getTime(); - return currentTime.clusterTime(); - }(); - - // If we haven't already populated the initial PBRT, then we are starting from a specific - // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. - const auto startAtTime = currentTime.addTicks(1).asTimestamp(); - resumeToken = ResumeToken::makeHighWaterMarkToken(startAtTime); - - // Make sure we update the 'resumeAfter' in the '_changeStreamSpec' so that we serialize the - // correct resume token when sending it to the shards. - _changeStreamSpec.setResumeAfter(resumeToken); + _documentKeyCache[tokenData.uuid.get()] = DocumentKeyCacheEntry({docKeyFields, isFinal}); } - expCtx->initialPostBatchResumeToken = resumeToken.toDocument().toBson(); } StageConstraints DocumentSourceChangeStreamTransform::constraints( diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 24adaf8d5c7..46fe08b7a49 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -41,6 +41,10 @@ public: /** * Creates a new transformation stage from the given specification. */ + static boost::intrusive_ptr<DocumentSourceChangeStreamTransform> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); + static boost::intrusive_ptr<DocumentSourceChangeStreamTransform> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); @@ -69,8 +73,8 @@ protected: private: // This constructor is private, callers should use the 'create()' method above. - DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>&, - BSONObj changeStreamSpec); + DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx, + DocumentSourceChangeStreamSpec spec); struct DocumentKeyCacheEntry { DocumentKeyCacheEntry() = default; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index f81f1880616..9c470320d95 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -312,10 +312,13 @@ protected: /** * Convenience method to create the class under test with a given ResumeTokenData. */ + intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> createDSEnsureResumeTokenPresent(ResumeTokenData tokenData) { + DocumentSourceChangeStreamSpec spec; + spec.setStartAfter(ResumeToken(tokenData)); auto checkResumeToken = - DocumentSourceChangeStreamEnsureResumeTokenPresent::create(getExpCtx(), tokenData); + DocumentSourceChangeStreamEnsureResumeTokenPresent::create(getExpCtx(), spec); _mock->setResumeToken(std::move(tokenData)); checkResumeToken->setSource(_mock.get()); return checkResumeToken; @@ -371,8 +374,10 @@ class CheckResumabilityTest : public CheckResumeTokenTest { protected: intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability( ResumeTokenData tokenData) { + DocumentSourceChangeStreamSpec spec; + spec.setStartAfter(ResumeToken(tokenData)); auto dsCheckResumability = - DocumentSourceChangeStreamCheckResumability::create(getExpCtx(), tokenData); + DocumentSourceChangeStreamCheckResumability::create(getExpCtx(), spec); _mock->setResumeToken(std::move(tokenData)); dsCheckResumability->setSource(_mock.get()); return dsCheckResumability; @@ -497,8 +502,8 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) { // Verify that a resume token whose documentKey only contains _id can be used to resume a stream // on a sharded collection as long as its _id matches the first document. We set 'inMongos' - // since this behaviour is only applicable when - // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS. + // since this behaviour is only applicable when DSCSEnsureResumeTokenPresent is running on + // mongoS. Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; @@ -537,8 +542,8 @@ TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoes TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumentKeyFields) { // Verify that the relaxed _id check only applies if _id is the sole field present in the // client's resume token, even if all the fields that are present match the first doc. We set - // 'inMongos' since this is only applicable when - // DocumentSourceChangeStreamEnsureResumeTokenPresent is running on mongoS. + // 'inMongos' since this is only applicable when DSCSEnsureResumeTokenPresent is running on + // mongoS. Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; @@ -980,8 +985,7 @@ TEST_F(CheckResumabilityTest, ShouldIgnoreOplogAfterFirstEOF) { TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event - // ResumeToken. + // Set up the DSCSCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); auto dsCheckResumability = createDSCheckResumability(token); @@ -1010,8 +1014,7 @@ TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimeUpToResumeT TEST_F(CheckResumabilityTest, ShouldSwallowAllEventsAtSameClusterTimePriorToResumeToken) { Timestamp resumeTimestamp(100, 2); - // Set up the DocumentSourceChangeStreamCheckResumability to check for an exact event - // ResumeToken. + // Set up the DSCSCheckResumability to check for an exact event ResumeToken. ResumeTokenData token(resumeTimestamp, 0, 0, testUuid(), Value(Document{{"_id"_sd, "3"_sd}})); auto dsCheckResumability = createDSCheckResumability(token); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 8f495bcb756..921e9c0d7a1 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -37,7 +37,7 @@ #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" -#include "mongo/db/pipeline/document_source_change_stream_update_on_add_shard.h" +#include "mongo/db/pipeline/document_source_change_stream_handle_topology_change.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" |