diff options
author | Ian Boros <ian.boros@10gen.com> | 2018-04-11 18:40:17 -0400 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-04-12 18:48:07 -0400 |
commit | 6841ce738419923002958acc760e150769b6f615 (patch) | |
tree | c32259c195861a612b437cbcb71d7e34da4ce2ec /src | |
parent | f98c59b381c74d2e136b2cce1c4436c02ee64741 (diff) | |
download | mongo-6841ce738419923002958acc760e150769b6f615.tar.gz |
SERVER-33114 make change streams unwind applyOps generated by transactions
Diffstat (limited to 'src')
7 files changed, 787 insertions, 285 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 88c56ab45b9..9bf8d695a42 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -271,6 +271,7 @@ pipelineeEnv.Library( 'document_source_bucket.cpp', 'document_source_bucket_auto.cpp', 'document_source_change_stream.cpp', + 'document_source_change_stream_transform.cpp', 'document_source_check_resume_token.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 2f7f185dcb2..1394cc7f0f8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -38,6 +38,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/pipeline/change_stream_constants.h" #include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" @@ -73,9 +74,12 @@ constexpr StringData DocumentSourceChangeStream::kFullDocumentField; constexpr StringData DocumentSourceChangeStream::kIdField; constexpr StringData DocumentSourceChangeStream::kNamespaceField; constexpr StringData DocumentSourceChangeStream::kUuidField; +constexpr StringData DocumentSourceChangeStream::kUpdateDescriptionField; constexpr StringData DocumentSourceChangeStream::kOperationTypeField; constexpr StringData DocumentSourceChangeStream::kStageName; constexpr StringData DocumentSourceChangeStream::kClusterTimeField; +constexpr StringData DocumentSourceChangeStream::kTxnNumberField; +constexpr StringData DocumentSourceChangeStream::kLsidField; constexpr StringData DocumentSourceChangeStream::kUpdateOpType; constexpr StringData DocumentSourceChangeStream::kDeleteOpType; constexpr StringData DocumentSourceChangeStream::kReplaceOpType; @@ -83,7 +87,6 @@ constexpr StringData DocumentSourceChangeStream::kInsertOpType; constexpr StringData DocumentSourceChangeStream::kInvalidateOpType; constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType; - namespace { static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; @@ -130,15 +133,6 @@ DocumentSourceOplogMatch::DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) : DocumentSourceMatch(std::move(filter), expCtx) {} -void checkValueType(const Value v, const StringData filedName, BSONType expectedType) { - uassert(40532, - str::stream() << "Entry field \"" << filedName << "\" should be " - << typeName(expectedType) - << ", found: " - << typeName(v.getType()), - (v.getType() == expectedType)); -} - namespace { /** * This stage is used internally for change notifications to close cursor after returning @@ -222,7 +216,8 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { auto doc = nextInput.getDocument(); const auto& kOperationTypeField = DocumentSourceChangeStream::kOperationTypeField; - checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String); + DocumentSourceChangeStream::checkValueType( + doc[kOperationTypeField], kOperationTypeField, BSONType::String); auto operationType = doc[kOperationTypeField].getString(); if (operationType == DocumentSourceChangeStream::kInvalidateOpType) { // Pass the invalidation forward, so that it can be included in the results, or @@ -236,6 +231,38 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { } // namespace +void DocumentSourceChangeStream::checkValueType(const Value v, + const StringData filedName, + BSONType expectedType) { + uassert(40532, + str::stream() << "Entry field \"" << filedName << "\" should be " + << typeName(expectedType) + << ", found: " + << typeName(v.getType()), + (v.getType() == expectedType)); +} + +// +// Helpers for building the oplog filter. +// +namespace { + +/** + * Constructs the filter which will match 'applyOps' oplog entries that are: + * 1) Part of a transaction + * 2) Have sub-entries which should be returned in the change stream + */ +BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) { + BSONObjBuilder applyOpsBuilder; + applyOpsBuilder.append("op", "c"); + applyOpsBuilder.append("lsid", BSON("$exists" << true)); + applyOpsBuilder.append("txnNumber", BSON("$exists" << true)); + const std::string& kApplyOpsNs = "o.applyOps.ns"; + applyOpsBuilder.appendAs(nsMatch, kApplyOpsNs); + return applyOpsBuilder.obj(); +} +} // namespace + BSONObj DocumentSourceChangeStream::buildMatchFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, @@ -324,11 +351,14 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( } auto opMatch = BSON(nsMatch["ns"] << OR(normalOpTypeMatch, chunkMigratedMatch)); + // 3) Look for 'applyOps' which were created as part of a transaction. + BSONObj applyOps = getTxnApplyOpsFilter(nsMatch["ns"], nss); + // Match oplog entries after "start" and are either supported (1) commands or (2) operations, // excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify // it was still present in the oplog. return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) - << BSON(OR(opMatch, commandMatch)) + << BSON(OR(opMatch, commandMatch, applyOps)) << BSON("fromMigrate" << NE << true))); } @@ -416,6 +446,13 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, } // namespace +std::string DocumentSourceChangeStream::buildAllCollectionsRegex(const NamespaceString& nss) { + // Match all namespaces that start with db name, followed by ".", then not followed by + // '$' or 'system.' + static const auto regexAllCollections = R"(\.(?!(\$|system\.)))"; + return "^" + nss.db() + regexAllCollections; +} + list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { // A change stream is a tailable + awaitData cursor. @@ -542,217 +579,7 @@ intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationSt // Mark the transformation stage as independent of any collection if the change stream is // watching all collections in the database. const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS(); - return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation( - expCtx, - stdx::make_unique<Transformation>(expCtx, changeStreamSpec), - kStageName.toString(), - isIndependentOfAnyCollection)); + return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform( + expCtx, changeStreamSpec, isIndependentOfAnyCollection)); } - -Document DocumentSourceChangeStream::Transformation::applyTransformation(const Document& input) { - // If we're executing a change stream pipeline that was forwarded from mongos, then we expect it - // to "need merge"---we expect to be executing the shards part of a split pipeline. It is never - // correct for mongos to pass through the change stream without splitting into into a merging - // part executed on mongos and a shards part. - // - // This is necessary so that mongos can correctly handle "invalidate" and "retryNeeded" change - // notifications. See SERVER-31978 for an example of why the pipeline must be split. - // - // We have to check this invariant at run-time of the change stream rather than parse time, - // since a mongos may forward a change stream in an invalid position (e.g. in a nested $lookup - // or $facet pipeline). In this case, mongod is responsible for parsing the pipeline and - // throwing an error without ever executing the change stream. - if (_expCtx->fromMongos) { - invariant(_expCtx->needsMerge); - } - - MutableDocument doc; - - // Extract the fields we need. - checkValueType(input[repl::OplogEntry::kOpTypeFieldName], - repl::OplogEntry::kOpTypeFieldName, - BSONType::String); - string op = input[repl::OplogEntry::kOpTypeFieldName].getString(); - Value ts = input[repl::OplogEntry::kTimestampFieldName]; - Value ns = input[repl::OplogEntry::kNamespaceFieldName]; - checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String); - Value uuid = input[repl::OplogEntry::kUuidFieldName]; - std::vector<FieldPath> documentKeyFields; - - // Deal with CRUD operations and commands. - auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op); - - // Ignore commands in the oplog when looking up the document key fields since a command implies - // that the change stream is about to be invalidated (e.g. collection drop). - if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) { - checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData); - // We need to retrieve the document key fields if our cache does not have an entry for this - // UUID or if the cache entry is not definitively final, indicating that the collection was - // unsharded when the entry was last populated. - auto it = _documentKeyCache.find(uuid.getUuid()); - if (it == _documentKeyCache.end() || !it->second.isFinal) { - auto docKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields( - _expCtx->opCtx, uuid.getUuid()); - if (it == _documentKeyCache.end() || docKeyFields.second) { - _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields); - } - } - - documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields; - } - NamespaceString nss(ns.getString()); - Value id = input.getNestedField("o._id"); - // Non-replace updates have the _id in field "o2". - StringData operationType; - Value fullDocument; - Value updateDescription; - Value documentKey; - - switch (opType) { - case repl::OpTypeEnum::kInsert: { - operationType = kInsertOpType; - fullDocument = input[repl::OplogEntry::kObjectFieldName]; - documentKey = Value(document_path_support::extractDocumentKeyFromDoc( - fullDocument.getDocument(), documentKeyFields)); - break; - } - case repl::OpTypeEnum::kDelete: { - operationType = kDeleteOpType; - documentKey = input[repl::OplogEntry::kObjectFieldName]; - break; - } - case repl::OpTypeEnum::kUpdate: { - if (id.missing()) { - operationType = kUpdateOpType; - checkValueType(input[repl::OplogEntry::kObjectFieldName], - repl::OplogEntry::kObjectFieldName, - BSONType::Object); - Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); - Value updatedFields = opObject["$set"]; - Value removedFields = opObject["$unset"]; - - // Extract the field names of $unset document. - vector<Value> removedFieldsVector; - if (removedFields.getType() == BSONType::Object) { - auto iter = removedFields.getDocument().fieldIterator(); - while (iter.more()) { - removedFieldsVector.push_back(Value(iter.next().first)); - } - } - updateDescription = Value(Document{ - {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, - {"removedFields", removedFieldsVector}}); - } else { - operationType = kReplaceOpType; - fullDocument = input[repl::OplogEntry::kObjectFieldName]; - } - documentKey = input[repl::OplogEntry::kObject2FieldName]; - break; - } - case repl::OpTypeEnum::kCommand: { - // Any command that makes it through our filter is an invalidating command such as a - // drop. - operationType = kInvalidateOpType; - // Make sure the result doesn't have a document key. - documentKey = Value(); - break; - } - case repl::OpTypeEnum::kNoop: { - operationType = kNewShardDetectedOpType; - // Generate a fake document Id for NewShardDetected operation so that we can resume - // after this operation. - documentKey = Value(Document{{kIdField, input[repl::OplogEntry::kObject2FieldName]}}); - break; - } - default: { MONGO_UNREACHABLE; } - } - - // UUID should always be present except for invalidate entries. It will not be under - // FCV 3.4, so we should close the stream as invalid. - if (operationType != kInvalidateOpType && uuid.missing()) { - warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get " - "downgraded after opening the stream?"; - operationType = kInvalidateOpType; - fullDocument = Value(); - updateDescription = Value(); - documentKey = Value(); - } - - // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will - // not appear in the output. - ResumeTokenData resumeTokenData; - resumeTokenData.clusterTime = ts.getTimestamp(); - resumeTokenData.documentKey = documentKey; - if (!uuid.missing()) - resumeTokenData.uuid = uuid.getUuid(); - doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument())); - doc.addField(kOperationTypeField, Value(operationType)); - doc.addField(kClusterTimeField, Value(resumeTokenData.clusterTime)); - - // If we're in a sharded environment, we'll need to merge the results by their sort key, so add - // that as metadata. - if (_expCtx->needsMerge) { - doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); - } - - // "invalidate" and "newShardDetected" entries have fewer fields. - if (operationType == kInvalidateOpType || operationType == kNewShardDetectedOpType) { - return doc.freeze(); - } - - doc.addField(kFullDocumentField, fullDocument); - doc.addField(kNamespaceField, Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); - doc.addField(kDocumentKeyField, documentKey); - - // Note that 'updateDescription' might be the 'missing' value, in which case it will not be - // serialized. - doc.addField("updateDescription", updateDescription); - return doc.freeze(); -} - -Document DocumentSourceChangeStream::Transformation::serializeStageOptions( - boost::optional<ExplainOptions::Verbosity> explain) const { - Document changeStreamOptions(_changeStreamSpec); - // If we're on a mongos and no other start time is specified, we want to start at the current - // cluster time on the mongos. This ensures all shards use the same start time. - if (_expCtx->inMongos && - changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName].missing() && - changeStreamOptions - [DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName] - .missing() && - changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] - .missing()) { - MutableDocument newChangeStreamOptions(changeStreamOptions); - - // Use the current cluster time plus 1 tick since the oplog query will include all - // operations/commands equal to or greater than the 'startAtClusterTime' timestamp. In - // particular, avoid including the last operation that went through mongos in an attempt to - // match the behavior of a replica set more closely. - auto clusterTime = LogicalClock::get(_expCtx->opCtx)->getClusterTime(); - clusterTime.addTicks(1); - newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] - [ResumeTokenClusterTime::kTimestampFieldName] = - Value(clusterTime.asTimestamp()); - changeStreamOptions = newChangeStreamOptions.freeze(); - } - return changeStreamOptions; -} - -DocumentSource::GetDepsReturn DocumentSourceChangeStream::Transformation::addDependencies( - DepsTracker* deps) const { - deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString()); - deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString()); - deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString()); - deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString()); - deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString()); - deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString()); - return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL; -} - -DocumentSource::GetModPathsReturn DocumentSourceChangeStream::Transformation::getModifiedPaths() - const { - // All paths are modified. - return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}}; -} - } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 6d7ac9e3eba..64d2595604c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -86,47 +86,6 @@ public: const NamespaceString _nss; }; - class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface { - public: - Transformation(const boost::intrusive_ptr<ExpressionContext>& expCtx, - BSONObj changeStreamSpec) - : _expCtx(expCtx), _changeStreamSpec(changeStreamSpec.getOwned()) {} - ~Transformation() = default; - Document applyTransformation(const Document& input) final; - TransformerType getType() const final { - return TransformerType::kChangeStreamTransformation; - }; - void optimize() final{}; - Document serializeStageOptions( - boost::optional<ExplainOptions::Verbosity> explain) const final; - DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const final; - DocumentSource::GetModPathsReturn getModifiedPaths() const final; - - private: - boost::intrusive_ptr<ExpressionContext> _expCtx; - BSONObj _changeStreamSpec; - - struct DocumentKeyCacheEntry { - DocumentKeyCacheEntry() = default; - - DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn) - : documentKeyFields(documentKeyFieldsIn.first), - isFinal(documentKeyFieldsIn.second){}; - // Fields of the document key, in order, including "_id" and the shard key if the - // collection is sharded. Empty until the first oplog entry with a uuid is encountered. - // Needed for transforming 'insert' oplog entries. - std::vector<FieldPath> documentKeyFields; - - // Set to true if the document key fields for this entry are definitively known and will - // not change. This implies that either the collection has become sharded or has been - // dropped. - bool isFinal; - }; - - // Map of collection UUID to document key fields. - std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache; - }; - // The name of the field where the document key (_id and shard key, if present) will be found // after the transformation. static constexpr StringData kDocumentKeyField = "documentKey"_sd; @@ -142,6 +101,10 @@ public: // transformation. static constexpr StringData kNamespaceField = "ns"_sd; + // Name of the field which stores information about updates. Only applies when OperationType + // is "update". + static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd; + // The name of the subfield of '_id' where the UUID of the namespace will be located after the // transformation. static constexpr StringData kUuidField = "uuid"_sd; @@ -158,6 +121,9 @@ public: // The name of this stage. static constexpr StringData kStageName = "$changeStream"_sd; + static constexpr StringData kTxnNumberField = "txnNumber"_sd; + static constexpr StringData kLsidField = "lsid"_sd; + // The different types of operations we can use for the operation type. static constexpr StringData kUpdateOpType = "update"_sd; static constexpr StringData kDeleteOpType = "delete"_sd; @@ -175,6 +141,8 @@ public: Timestamp startFrom, bool startFromInclusive); + static std::string buildAllCollectionsRegex(const NamespaceString& nss); + /** * Parses a $changeStream stage from 'elem' and produces the $match and transformation * stages required. @@ -194,6 +162,12 @@ public: static BSONObj replaceResumeTokenInCommand(const BSONObj originalCmdObj, const BSONObj resumeToken); + /** + * Helper used by various change stream stages. Used for asserting that a certain Value of a + * field has a certain type. Will uassert() if the field does not have the expected type. + */ + static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType); + private: enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5e51059853c..45f2c3373d4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -124,7 +125,7 @@ public: * Returns a list of stages expanded from a $changStream specification, starting with a * DocumentSourceMock which contains a single document representing 'entry'. */ - vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { + vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) { const auto spec = fromjson("{$changeStream: {}}"); list<intrusive_ptr<DocumentSource>> result = DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); @@ -142,7 +143,7 @@ public: ASSERT(match); auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx()); - auto mock = DocumentSourceMock::create(D(entry.toBSON())); + auto mock = DocumentSourceMock::create(D(entry)); executableMatch->setSource(mock.get()); // Check the oplog entry is transformed correctly. @@ -158,6 +159,10 @@ public: return {mock, executableMatch, transform, closeCursor}; } + vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { + return makeStages(entry.toBSON()); + } + OplogEntry createCommand(const BSONObj& oField, const boost::optional<UUID> uuid = boost::none, const boost::optional<bool> fromMigrate = boost::none, @@ -183,6 +188,40 @@ public: } /** + * Helper for running an applyOps through the pipeline, and getting all of the results. + */ + std::vector<Document> getApplyOpsResults(const Document& applyOpsDoc, + const LogicalSessionFromClient& lsid) { + BSONObj applyOpsObj = applyOpsDoc.toBson(); + + // Create an oplog entry and then glue on an lsid and txnNumber + auto baseOplogEntry = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOpsObj, + testUuid(), + boost::none, // fromMigrate + BSONObj()); + BSONObjBuilder builder(baseOplogEntry.toBSON()); + builder.append("lsid", lsid.toBSON()); + builder.append("txnNumber", 0LL); + BSONObj oplogEntry = builder.done(); + + // Create the stages and check that the documents produced matched those in the applyOps. + vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry); + auto transform = stages[2].get(); + invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); + + std::vector<Document> res; + auto next = transform->getNext(); + while (next.isAdvanced()) { + res.push_back(next.releaseDocument()); + next = transform->getNext(); + } + return res; + } + + + /** * This method is required to avoid a static initialization fiasco resulting from calling * UUID::gen() in file static scope. */ @@ -191,6 +230,15 @@ public: return *uuid_gen; } + static LogicalSessionFromClient testLsid() { + // Required to avoid static initialization fiasco. + static const UUID* uuid = new UUID(UUID::gen()); + LogicalSessionFromClient lsid{}; + lsid.setId(*uuid); + return lsid; + } + + /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ @@ -633,6 +681,116 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { checkTransformation(newShardDetected, expectedNewShardDetected); } +TEST_F(ChangeStreamStageTest, TransformEmptyApplyOps) { + Document applyOpsDoc{{"applyOps", Value{std::vector<Document>{}}}}; + + LogicalSessionFromClient lsid = testLsid(); + vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid); + + // Should not return anything. + ASSERT_EQ(results.size(), 0u); +} + +TEST_F(ChangeStreamStageTest, TransformNonTransactionApplyOps) { + BSONObj applyOpsObj = Document{{"applyOps", + Value{std::vector<Document>{Document{ + {"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}} + .toBson(); + + // Don't append lsid or txnNumber + + auto oplogEntry = makeOplogEntry(OpTypeEnum::kCommand, + nss.getCommandNS(), + applyOpsObj, + testUuid(), + boost::none, // fromMigrate + BSONObj()); + + + checkTransformation(oplogEntry, boost::none); +} + +TEST_F(ChangeStreamStageTest, TransformApplyOpsWithEntriesOnDifferentNs) { + // Doesn't use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + + auto otherUUID = UUID::gen(); + Document applyOpsDoc{ + {"applyOps", + Value{std::vector<Document>{ + Document{{"op", "i"_sd}, + {"ns", "someotherdb.collname"_sd}, + {"ui", otherUUID}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}, + Document{{"op", "u"_sd}, + {"ns", "someotherdb.collname"_sd}, + {"ui", otherUUID}, + {"o", Value{Document{{"$set", Value{Document{{"x", "hallo 2"_sd}}}}}}}, + {"o2", Value{Document{{"_id", 123}}}}}, + }}}, + }; + LogicalSessionFromClient lsid = testLsid(); + vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid); + + // All documents should be skipped. + ASSERT_EQ(results.size(), 0u); +} + + +TEST_F(ChangeStreamStageTest, TransformApplyOps) { + // Doesn't use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + + Document applyOpsDoc{ + {"applyOps", + Value{std::vector<Document>{ + Document{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}, + Document{{"op", "u"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"$set", Value{Document{{"x", "hallo 2"_sd}}}}}}}, + {"o2", Value{Document{{"_id", 123}}}}}, + // Operation on another namespace which should be skipped. + Document{{"op", "i"_sd}, + {"ns", "someotherdb.collname"_sd}, + {"ui", UUID::gen()}, + {"o", Value{Document{{"_id", 0}, {"x", "Should not read this!"_sd}}}}}, + }}}, + }; + LogicalSessionFromClient lsid = testLsid(); + vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid); + + // The third document should be skipped. + ASSERT_EQ(results.size(), 2u); + + // Check that the first document is correct. + auto nextDoc = results[0]; + ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["x"].getString(), "hallo"); + ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0); + + // Check the second document. + nextDoc = results[1]; + ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kUpdateOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kDocumentKeyField]["_id"].getInt(), 123); + ASSERT_EQ(nextDoc[DSChangeStream::kUpdateDescriptionField]["updatedFields"]["x"].getString(), + "hallo 2"); + ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0); + + // The third document is skipped. +} + TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { const Timestamp ts(3, 45); const long long term = 4; @@ -727,7 +885,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result)); ASSERT_EQ(allStages.size(), 3UL); auto stage = allStages[1]; - ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(stage.get())); + ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get())); // // Serialize the stage and confirm contents. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp new file mode 100644 index 00000000000..16301b7fcd5 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -0,0 +1,423 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_change_stream_transform.h" + +#include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/db/bson/bson_helper.h" +#include "mongo/db/catalog/uuid_catalog.h" +#include "mongo/db/commands/feature_compatibility_version_documentation.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/pipeline/change_stream_constants.h" +#include "mongo/db/pipeline/document_path_support.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/pipeline/resume_token.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { + +using boost::intrusive_ptr; +using boost::optional; +using std::list; +using std::string; +using std::vector; + +namespace { +constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; + +bool isOpTypeRelevant(const Document& d) { + Value op = d["op"]; + invariant(!op.missing()); + + if (op.getString() != "n") { + return true; + } + + Value type = d.getNestedField("o2.type"); + if (!type.missing() && type.getString() == "migrateChunkToNewShard") { + return true; + } + + return false; +} +} // namespace + +DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + BSONObj changeStreamSpec, + bool isIndependentOfAnyCollection) + : DocumentSource(expCtx), + _changeStreamSpec(changeStreamSpec.getOwned()), + _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { + + if (expCtx->ns.isCollectionlessAggregateNS()) { + _nsRegex.emplace(DocumentSourceChangeStream::buildAllCollectionsRegex(expCtx->ns)); + } +} + +DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints( + Pipeline::SplitState pipeState) const { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage); + + constraints.canSwapWithMatch = true; + constraints.canSwapWithLimit = true; + // This transformation could be part of a 'collectionless' change stream on an entire + // database or cluster, mark as independent of any collection if so. + constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection; + return constraints; +} + +void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Document& input) { + invariant(!_txnContext); + + checkValueType(input["o"], "o", BSONType::Object); + Value applyOps = input.getNestedField("o.applyOps"); + checkValueType(applyOps, "applyOps", BSONType::Array); + invariant(applyOps.getArrayLength() > 0); + + Value lsid = input["lsid"]; + checkValueType(lsid, "lsid", BSONType::Object); + + Value txnNumber = input["txnNumber"]; + checkValueType(txnNumber, "txnNumber", BSONType::NumberLong); + + _txnContext.emplace(applyOps, lsid.getDocument(), txnNumber.getLong()); +} + +Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) { + // If we're executing a change stream pipeline that was forwarded from mongos, then we expect it + // to "need merge"---we expect to be executing the shards part of a split pipeline. It is never + // correct for mongos to pass through the change stream without splitting into into a merging + // part executed on mongos and a shards part. + // + // This is necessary so that mongos can correctly handle "invalidate" and "retryNeeded" change + // notifications. See SERVER-31978 for an example of why the pipeline must be split. + // + // We have to check this invariant at run-time of the change stream rather than parse time, + // since a mongos may forward a change stream in an invalid position (e.g. in a nested $lookup + // or $facet pipeline). In this case, mongod is responsible for parsing the pipeline and + // throwing an error without ever executing the change stream. + if (pExpCtx->fromMongos) { + invariant(pExpCtx->needsMerge); + } + + MutableDocument doc; + + // Extract the fields we need. + checkValueType(input[repl::OplogEntry::kOpTypeFieldName], + repl::OplogEntry::kOpTypeFieldName, + BSONType::String); + string op = input[repl::OplogEntry::kOpTypeFieldName].getString(); + Value ts = input[repl::OplogEntry::kTimestampFieldName]; + Value ns = input[repl::OplogEntry::kNamespaceFieldName]; + checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String); + Value uuid = input[repl::OplogEntry::kUuidFieldName]; + std::vector<FieldPath> documentKeyFields; + + // Deal with CRUD operations and commands. + auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op); + + // Ignore commands in the oplog when looking up the document key fields since a command implies + // that the change stream is about to be invalidated (e.g. collection drop). + if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) { + checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData); + // We need to retrieve the document key fields if our cache does not have an entry for this + // UUID or if the cache entry is not definitively final, indicating that the collection was + // unsharded when the entry was last populated. + auto it = _documentKeyCache.find(uuid.getUuid()); + if (it == _documentKeyCache.end() || !it->second.isFinal) { + auto docKeyFields = pExpCtx->mongoProcessInterface->collectDocumentKeyFields( + pExpCtx->opCtx, uuid.getUuid()); + if (it == _documentKeyCache.end() || docKeyFields.second) { + _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields); + } + } + + documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields; + } + NamespaceString nss(ns.getString()); + Value id = input.getNestedField("o._id"); + // Non-replace updates have the _id in field "o2". + StringData operationType; + Value fullDocument; + Value updateDescription; + Value documentKey; + + switch (opType) { + case repl::OpTypeEnum::kInsert: { + operationType = DocumentSourceChangeStream::kInsertOpType; + fullDocument = input[repl::OplogEntry::kObjectFieldName]; + documentKey = Value(document_path_support::extractDocumentKeyFromDoc( + fullDocument.getDocument(), documentKeyFields)); + break; + } + case repl::OpTypeEnum::kDelete: { + operationType = DocumentSourceChangeStream::kDeleteOpType; + documentKey = input[repl::OplogEntry::kObjectFieldName]; + break; + } + case repl::OpTypeEnum::kUpdate: { + if (id.missing()) { + operationType = DocumentSourceChangeStream::kUpdateOpType; + checkValueType(input[repl::OplogEntry::kObjectFieldName], + repl::OplogEntry::kObjectFieldName, + BSONType::Object); + Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); + Value updatedFields = opObject["$set"]; + Value removedFields = opObject["$unset"]; + + // Extract the field names of $unset document. + vector<Value> removedFieldsVector; + if (removedFields.getType() == BSONType::Object) { + auto iter = removedFields.getDocument().fieldIterator(); + while (iter.more()) { + removedFieldsVector.push_back(Value(iter.next().first)); + } + } + updateDescription = Value(Document{ + {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, + {"removedFields", removedFieldsVector}}); + } else { + operationType = DocumentSourceChangeStream::kReplaceOpType; + fullDocument = input[repl::OplogEntry::kObjectFieldName]; + } + documentKey = input[repl::OplogEntry::kObject2FieldName]; + break; + } + case repl::OpTypeEnum::kCommand: { + if (!input.getNestedField("o.applyOps").missing()) { + // We should never see an applyOps inside of an applyOps that made it past the + // filter. This prevents more than one level of recursion. + invariant(!_txnContext); + + initializeTransactionContext(input); + + // Now call applyTransformation on the first relevant entry in the applyOps. + boost::optional<Document> nextDoc = extractNextApplyOpsEntry(); + invariant(nextDoc); + + return applyTransformation(*nextDoc); + } + // Any command that makes it through our filter is an invalidating command such as a + // drop. + operationType = DocumentSourceChangeStream::kInvalidateOpType; + // Make sure the result doesn't have a document key. + documentKey = Value(); + break; + } + case repl::OpTypeEnum::kNoop: { + operationType = DocumentSourceChangeStream::kNewShardDetectedOpType; + // Generate a fake document Id for NewShardDetected operation so that we can resume + // after this operation. + documentKey = Value(Document{{DocumentSourceChangeStream::kIdField, + input[repl::OplogEntry::kObject2FieldName]}}); + break; + } + default: { MONGO_UNREACHABLE; } + } + + // UUID should always be present except for invalidate entries. It will not be under + // FCV 3.4, so we should close the stream as invalid. + if (operationType != DocumentSourceChangeStream::kInvalidateOpType && uuid.missing()) { + warning() << "Saw a CRUD op without a UUID. Did Feature Compatibility Version get " + "downgraded after opening the stream?"; + operationType = DocumentSourceChangeStream::kInvalidateOpType; + fullDocument = Value(); + updateDescription = Value(); + documentKey = Value(); + } + + // Note that 'documentKey' and/or 'uuid' might be missing, in which case the missing fields will + // not appear in the output. + ResumeTokenData resumeTokenData; + if (_txnContext) { + // We're in the middle of unwinding an 'applyOps'. + + // TODO: SERVER-34314 + // For now we return an empty resumeToken. + } else { + resumeTokenData.clusterTime = ts.getTimestamp(); + resumeTokenData.documentKey = documentKey; + if (!uuid.missing()) + resumeTokenData.uuid = uuid.getUuid(); + } + + if (_txnContext) { + doc.addField(DocumentSourceChangeStream::kTxnNumberField, + Value(static_cast<long long>(_txnContext->txnNumber))); + doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid)); + } + + doc.addField(DocumentSourceChangeStream::kIdField, + Value(ResumeToken(resumeTokenData).toDocument())); + doc.addField(DocumentSourceChangeStream::kOperationTypeField, Value(operationType)); + doc.addField(DocumentSourceChangeStream::kClusterTimeField, Value(resumeTokenData.clusterTime)); + + // If we're in a sharded environment, we'll need to merge the results by their sort key, so add + // that as metadata. + if (pExpCtx->needsMerge) { + doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); + } + + // "invalidate" and "newShardDetected" entries have fewer fields. + if (operationType == DocumentSourceChangeStream::kInvalidateOpType || + operationType == DocumentSourceChangeStream::kNewShardDetectedOpType) { + return doc.freeze(); + } + + doc.addField(DocumentSourceChangeStream::kFullDocumentField, fullDocument); + doc.addField(DocumentSourceChangeStream::kNamespaceField, + Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); + doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey); + + // Note that 'updateDescription' might be the 'missing' value, in which case it will not be + // serialized. + doc.addField("updateDescription", updateDescription); + return doc.freeze(); +} + +Value DocumentSourceChangeStreamTransform::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + Document changeStreamOptions(_changeStreamSpec); + // If we're on a mongos and no other start time is specified, we want to start at the current + // cluster time on the mongos. This ensures all shards use the same start time. + if (pExpCtx->inMongos && + changeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName].missing() && + changeStreamOptions + [DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName] + .missing() && + changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] + .missing()) { + MutableDocument newChangeStreamOptions(changeStreamOptions); + + // Use the current cluster time plus 1 tick since the oplog query will include all + // operations/commands equal to or greater than the 'startAtClusterTime' timestamp. In + // particular, avoid including the last operation that went through mongos in an attempt to + // match the behavior of a replica set more closely. + auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime(); + clusterTime.addTicks(1); + newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtClusterTimeFieldName] + [ResumeTokenClusterTime::kTimestampFieldName] = + Value(clusterTime.asTimestamp()); + changeStreamOptions = newChangeStreamOptions.freeze(); + } + return Value(Document{{getSourceName(), changeStreamOptions}}); +} + +DocumentSource::GetDepsReturn DocumentSourceChangeStreamTransform::getDependencies( + DepsTracker* deps) const { + deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString()); + return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL; +} + +DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifiedPaths() const { + // All paths are modified. + return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}}; +} + +DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { + pExpCtx->checkForInterrupt(); + + // If we're unwinding an 'applyOps' from a transaction, check if there are any documents we have + // stored that can be returned. + if (_txnContext) { + boost::optional<Document> next = extractNextApplyOpsEntry(); + if (next) { + return applyTransformation(*next); + } + } + + // Get the next input document. + auto input = pSource->getNext(); + if (!input.isAdvanced()) { + return input; + } + + // Apply the transform and return the document with added fields. + return applyTransformation(input.releaseDocument()); +} + +bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) { + if (!isOpTypeRelevant(d)) { + return false; + } + + Value nsField = d["ns"]; + invariant(!nsField.missing()); + + if (_nsRegex) { + // Match all namespaces that start with db name, followed by ".", then not followed by + // '$' or 'system.' + return _nsRegex->PartialMatch(nsField.getString()); + } + + return nsField.getString() == pExpCtx->ns.ns(); +} + +boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() { + + while (_txnContext && _txnContext->pos < _txnContext->arr.size()) { + Document d = _txnContext->arr[_txnContext->pos++].getDocument(); + if (isDocumentRelevant(d)) { + return d; + } + } + + _txnContext = boost::none; + + return boost::none; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h new file mode 100644 index 00000000000..61e6bfb25eb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/field_path.h" + +namespace mongo { + +class DocumentSourceChangeStreamTransform : public DocumentSource { +public: + DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx, + BSONObj changeStreamSpec, + bool isIndependentOfAnyCollection); + Document applyTransformation(const Document& input); + DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; + DocumentSource::GetModPathsReturn getModifiedPaths() const final; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const; + DocumentSource::StageConstraints constraints(Pipeline::SplitState pipeState) const final; + DocumentSource::GetNextResult getNext(); + const char* getSourceName() const { + return DocumentSourceChangeStream::kStageName.rawData(); + } + +private: + struct DocumentKeyCacheEntry { + DocumentKeyCacheEntry() = default; + + DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn) + : documentKeyFields(documentKeyFieldsIn.first), isFinal(documentKeyFieldsIn.second){}; + // Fields of the document key, in order, including "_id" and the shard key if the + // collection is sharded. Empty until the first oplog entry with a uuid is encountered. + // Needed for transforming 'insert' oplog entries. + std::vector<FieldPath> documentKeyFields; + + // Set to true if the document key fields for this entry are definitively known and will + // not change. This implies that either the collection has become sharded or has been + // dropped. + bool isFinal; + }; + + /** + * Represents the DocumentSource's state if it's currently reading from an 'applyOps' entry + * which was created as part of a transaction. + */ + struct TransactionContext { + MONGO_DISALLOW_COPYING(TransactionContext); + + // The array of oplog entries from an 'applyOps' representing the transaction. Only kept + // around so that the underlying memory of 'arr' isn't freed. + Value opArray; + + // Array representation of the 'opArray' field. Stored like this to avoid re-typechecking + // each call to next(), or copying the entire array. + const std::vector<Value>& arr; + + // Our current place in the 'opArray'. + size_t pos; + + // Fields that were taken from the 'applyOps' oplog entry. + Document lsid; + TxnNumber txnNumber; + + TransactionContext(const Value& applyOpsVal, const Document& lsidDoc, TxnNumber n) + : opArray(applyOpsVal), arr(opArray.getArray()), pos(0), lsid(lsidDoc), txnNumber(n) {} + }; + + void initializeTransactionContext(const Document& input); + + /** + * Gets the next relevant applyOps entry that should be returned. If there is none, returns + * empty document. + */ + boost::optional<Document> extractNextApplyOpsEntry(); + + /** + * Helper for extractNextApplyOpsEntry(). Checks the namespace of the given document to see + * if it should be returned in the change stream. + */ + bool isDocumentRelevant(const Document& d); + + BSONObj _changeStreamSpec; + + // Map of collection UUID to document key fields. + std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache; + + // Regex for matching the "ns" field in applyOps sub-entries. Only used when we have a + // change stream on the entire DB. When watching just a single collection, this field is + // boost::none, and an exact string equality check is used instead. + boost::optional<pcrecpp::RE> _nsRegex; + + // Represents if the current 'applyOps' we're unwinding, if any. + boost::optional<TransactionContext> _txnContext; + + // Set to true if this transformation stage can be run on the collectionless namespace. + bool _isIndependentOfAnyCollection; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 5d0e850d3fd..477972ffdde 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -57,7 +57,6 @@ public: kInclusionProjection, kComputedProjection, kReplaceRoot, - kChangeStreamTransformation, }; virtual ~TransformerInterface() = default; virtual Document applyTransformation(const Document& input) = 0; @@ -104,23 +103,14 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - StageConstraints constraints( - StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kNone, - DiskUseRequirement::kNoDiskUse, - (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation - ? FacetRequirement::kNotAllowed - : FacetRequirement::kAllowed), - (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation - ? TransactionRequirement::kNotAllowed - : TransactionRequirement::kAllowed), - (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation - ? ChangeStreamRequirement::kChangeStreamStage - : ChangeStreamRequirement::kWhitelist)); - + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, + ChangeStreamRequirement::kWhitelist); constraints.canSwapWithMatch = true; constraints.canSwapWithLimit = true; // This transformation could be part of a 'collectionless' change stream on an entire |