summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream_transform.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp90
1 files changed, 32 insertions, 58 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 0f62b87ec4f..229b9229565 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -47,11 +47,9 @@
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
-#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/update/update_oplog_entry_version.h"
-#include "mongo/db/vector_clock.h"
namespace mongo {
@@ -71,81 +69,57 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(
DocumentSourceChangeStreamTransform::createFromBson,
feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV());
+intrusive_ptr<DocumentSourceChangeStreamTransform> DocumentSourceChangeStreamTransform::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec) {
+ return new DocumentSourceChangeStreamTransform(expCtx, spec);
+}
+
intrusive_ptr<DocumentSourceChangeStreamTransform>
DocumentSourceChangeStreamTransform::createFromBson(
- BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ BSONElement rawSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5467601,
"the '$_internalChangeStreamTransform' object spec must be an object",
- spec.type() == BSONType::Object);
-
- return new DocumentSourceChangeStreamTransform(expCtx, spec.Obj());
+ rawSpec.type() == BSONType::Object);
+ auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
+ rawSpec.Obj());
+ return new DocumentSourceChangeStreamTransform(expCtx, std::move(spec));
}
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj changeStreamSpec)
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec)
: DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx),
- _changeStreamSpec(DocumentSourceChangeStreamSpec::parse(
- IDLParserErrorContext("$changeStream"), changeStreamSpec)),
+ _changeStreamSpec(std::move(spec)),
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
// If the change stream spec requested a pre-image, make sure that we supply one.
_includePreImageOptime =
(_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
+ // Extract the resume token or high-water-mark from the spec.
+ auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
+
+ // Set the initialPostBatchResumeToken on the expression context.
+ expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON();
+
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
- auto resumeAfter = _changeStreamSpec.getResumeAfter();
- auto startAfter = _changeStreamSpec.getStartAfter();
-
- ResumeToken resumeToken;
- if (resumeAfter || startAfter) {
- resumeToken = resumeAfter ? resumeAfter.get() : startAfter.get();
- ResumeTokenData tokenData = resumeToken.getData();
-
- if (!tokenData.documentKey.missing() && tokenData.uuid) {
- std::vector<FieldPath> docKeyFields;
- auto docKey = tokenData.documentKey.getDocument();
-
- auto iter = docKey.fieldIterator();
- while (iter.more()) {
- auto fieldPair = iter.next();
- docKeyFields.push_back(fieldPair.first);
- }
+ if (!tokenData.documentKey.missing() && tokenData.uuid) {
+ std::vector<FieldPath> docKeyFields;
+ auto docKey = tokenData.documentKey.getDocument();
+
+ auto iter = docKey.fieldIterator();
+ while (iter.more()) {
+ auto fieldPair = iter.next();
+ docKeyFields.push_back(fieldPair.first);
+ }
- // If the document key from the resume token has more than one field, that means it
- // includes the shard key and thus should never change.
- const bool isFinal = docKeyFields.size() > 1;
+ // If the document key from the resume token has more than one field, that means it
+ // includes the shard key and thus should never change.
+ const bool isFinal = docKeyFields.size() > 1;
- _documentKeyCache[tokenData.uuid.get()] =
- DocumentKeyCacheEntry({docKeyFields, isFinal});
- }
- } else if (auto startAtOperationTime = _changeStreamSpec.getStartAtOperationTime()) {
- // TODO SERVER-56669: Move this change to populate resume token in 'ChangeStreamSpec' into
- // DocumentSourceChangeStream::create().
- resumeToken = ResumeToken::makeHighWaterMarkToken(*startAtOperationTime);
- } else {
- // If we do not have an explicit starting point, we should start from the latest majority
- // committed operation. If we are on mongoS and do not have a starting point, set it to the
- // current clusterTime so that all shards start in sync. We always start one tick beyond the
- // most recent operation, to ensure that the stream does not return it.
- auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
- const auto currentTime = !expCtx->inMongos
- ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()}
- : [&] {
- const auto currentTime = VectorClock::get(expCtx->opCtx)->getTime();
- return currentTime.clusterTime();
- }();
-
- // If we haven't already populated the initial PBRT, then we are starting from a specific
- // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
- const auto startAtTime = currentTime.addTicks(1).asTimestamp();
- resumeToken = ResumeToken::makeHighWaterMarkToken(startAtTime);
-
- // Make sure we update the 'resumeAfter' in the '_changeStreamSpec' so that we serialize the
- // correct resume token when sending it to the shards.
- _changeStreamSpec.setResumeAfter(resumeToken);
+ _documentKeyCache[tokenData.uuid.get()] = DocumentKeyCacheEntry({docKeyFields, isFinal});
}
- expCtx->initialPostBatchResumeToken = resumeToken.toDocument().toBson();
}
StageConstraints DocumentSourceChangeStreamTransform::constraints(