diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream_transform.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 90 |
1 files changed, 32 insertions, 58 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 0f62b87ec4f..229b9229565 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -47,11 +47,9 @@ #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" -#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/update/update_oplog_entry_version.h" -#include "mongo/db/vector_clock.h" namespace mongo { @@ -71,81 +69,57 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE( DocumentSourceChangeStreamTransform::createFromBson, feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()); +intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) { + return new DocumentSourceChangeStreamTransform(expCtx, spec); +} + intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::createFromBson( - BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + BSONElement rawSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(5467601, "the '$_internalChangeStreamTransform' object spec must be an object", - spec.type() == BSONType::Object); - - return new DocumentSourceChangeStreamTransform(expCtx, spec.Obj()); + rawSpec.type() == BSONType::Object); + auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), + rawSpec.Obj()); + return new DocumentSourceChangeStreamTransform(expCtx, std::move(spec)); } DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( - const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec) + const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec) : DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx), - _changeStreamSpec(DocumentSourceChangeStreamSpec::parse( - IDLParserErrorContext("$changeStream"), changeStreamSpec)), + _changeStreamSpec(std::move(spec)), _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) { // If the change stream spec requested a pre-image, make sure that we supply one. _includePreImageOptime = (_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff); + // Extract the resume token or high-water-mark from the spec. + auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); + + // Set the initialPostBatchResumeToken on the expression context. + expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON(); + // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. - auto resumeAfter = _changeStreamSpec.getResumeAfter(); - auto startAfter = _changeStreamSpec.getStartAfter(); - - ResumeToken resumeToken; - if (resumeAfter || startAfter) { - resumeToken = resumeAfter ? resumeAfter.get() : startAfter.get(); - ResumeTokenData tokenData = resumeToken.getData(); - - if (!tokenData.documentKey.missing() && tokenData.uuid) { - std::vector<FieldPath> docKeyFields; - auto docKey = tokenData.documentKey.getDocument(); - - auto iter = docKey.fieldIterator(); - while (iter.more()) { - auto fieldPair = iter.next(); - docKeyFields.push_back(fieldPair.first); - } + if (!tokenData.documentKey.missing() && tokenData.uuid) { + std::vector<FieldPath> docKeyFields; + auto docKey = tokenData.documentKey.getDocument(); + + auto iter = docKey.fieldIterator(); + while (iter.more()) { + auto fieldPair = iter.next(); + docKeyFields.push_back(fieldPair.first); + } - // If the document key from the resume token has more than one field, that means it - // includes the shard key and thus should never change. - const bool isFinal = docKeyFields.size() > 1; + // If the document key from the resume token has more than one field, that means it + // includes the shard key and thus should never change. + const bool isFinal = docKeyFields.size() > 1; - _documentKeyCache[tokenData.uuid.get()] = - DocumentKeyCacheEntry({docKeyFields, isFinal}); - } - } else if (auto startAtOperationTime = _changeStreamSpec.getStartAtOperationTime()) { - // TODO SERVER-56669: Move this change to populate resume token in 'ChangeStreamSpec' into - // DocumentSourceChangeStream::create(). - resumeToken = ResumeToken::makeHighWaterMarkToken(*startAtOperationTime); - } else { - // If we do not have an explicit starting point, we should start from the latest majority - // committed operation. If we are on mongoS and do not have a starting point, set it to the - // current clusterTime so that all shards start in sync. We always start one tick beyond the - // most recent operation, to ensure that the stream does not return it. - auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); - const auto currentTime = !expCtx->inMongos - ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()} - : [&] { - const auto currentTime = VectorClock::get(expCtx->opCtx)->getTime(); - return currentTime.clusterTime(); - }(); - - // If we haven't already populated the initial PBRT, then we are starting from a specific - // timestamp rather than a resume token. Initialize the PBRT to a high water mark token. - const auto startAtTime = currentTime.addTicks(1).asTimestamp(); - resumeToken = ResumeToken::makeHighWaterMarkToken(startAtTime); - - // Make sure we update the 'resumeAfter' in the '_changeStreamSpec' so that we serialize the - // correct resume token when sending it to the shards. - _changeStreamSpec.setResumeAfter(resumeToken); + _documentKeyCache[tokenData.uuid.get()] = DocumentKeyCacheEntry({docKeyFields, isFinal}); } - expCtx->initialPostBatchResumeToken = resumeToken.toDocument().toBson(); } StageConstraints DocumentSourceChangeStreamTransform::constraints( |