diff options
author | Arun Banala <arun.banala@mongodb.com> | 2022-02-08 13:44:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-22 12:52:38 +0000 |
commit | af85e60d19d1a975a7babf5300c6d336e474e40a (patch) | |
tree | 77947d7687a43d749fee2393f78c683fd050135a /src/mongo/db/pipeline | |
parent | b6485b14e3749d1c6bc506fa5a5a3fd9b169af74 (diff) | |
download | mongo-af85e60d19d1a975a7babf5300c6d336e474e40a.tar.gz |
SERVER-62801 Add change stream event for create operation
Diffstat (limited to 'src/mongo/db/pipeline')
15 files changed, 505 insertions, 241 deletions
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index 8ab33cce834..dcab512efb5 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -32,6 +32,7 @@ #include "mongo/db/bson/bson_helper.h" #include "mongo/db/matcher/expression_always_boolean.h" #include "mongo/db/matcher/expression_parser.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_rewrite_helpers.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/pipeline.h" @@ -104,11 +105,13 @@ std::unique_ptr<MatchExpression> buildOperationFilter( auto renameFromEvent = BSON("o.renameCollection" << BSONRegEx(nsRegex)); auto renameToEvent = BSON("o.renameCollection" << BSON("$exists" << true) << "o.to" << BSONRegEx(nsRegex)); + const auto createEvent = BSON("o.create" << BSONRegEx(collRegex)); auto orCmdEvents = std::make_unique<OrMatchExpression>(); orCmdEvents->add(MatchExpressionParser::parseAndNormalize(dropEvent, expCtx)); orCmdEvents->add(MatchExpressionParser::parseAndNormalize(renameFromEvent, expCtx)); orCmdEvents->add(MatchExpressionParser::parseAndNormalize(renameToEvent, expCtx)); + orCmdEvents->add(MatchExpressionParser::parseAndNormalize(createEvent, expCtx)); // Omit dropDatabase on single-collection streams. While the stream will be invalidated before // it sees this event, the user will incorrectly see it if they startAfter the invalidate. @@ -182,12 +185,20 @@ std::unique_ptr<MatchExpression> buildTransactionFilter( // 'prevOpTime' link to another 'applyOps' command, indicating a multi-entry transaction. BSONArrayBuilder orBuilder(applyOpsBuilder.subarrayStart("$or")); { - { - BSONObjBuilder nsMatchBuilder(orBuilder.subobjStart()); - nsMatchBuilder.append( - "o.applyOps.ns"_sd, - BSONRegEx(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns))); - } + // Regexes for full-namespace, collection, and command-namespace matching. + auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns); + auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx->ns); + auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx->ns); + + // Match relevant CRUD events on the monitored namespaces. + orBuilder.append(BSON("o.applyOps.ns" << BSONRegEx(nsRegex))); + + // Match relevant command events on the monitored namespaces. + orBuilder.append(BSON( + "o.applyOps" << BSON( + "$elemMatch" << BSON("ns" << BSONRegEx(cmdNsRegex) + << OR(BSON("o.create" << BSONRegEx(collRegex))))))); + // The default repl::OpTime is the value used to indicate a null "prevOpTime" link. orBuilder.append(BSON(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName << BSON("$ne" << repl::OpTime().toBSON()))); @@ -243,18 +254,9 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter( } BSONObj getMatchFilterForClassicOperationTypes() { - return BSON(DocumentSourceChangeStream::kOperationTypeField << BSON( - "$in" << BSON_ARRAY(DocumentSourceChangeStream::kUpdateOpType - << DocumentSourceChangeStream::kDeleteOpType - << DocumentSourceChangeStream::kReplaceOpType - << DocumentSourceChangeStream::kInsertOpType - << DocumentSourceChangeStream::kDropCollectionOpType - << DocumentSourceChangeStream::kRenameCollectionOpType - << DocumentSourceChangeStream::kDropDatabaseOpType - << DocumentSourceChangeStream::kInvalidateOpType - << DocumentSourceChangeStream::kReshardBeginOpType - << DocumentSourceChangeStream::kReshardDoneCatchUpOpType - << DocumentSourceChangeStream::kNewShardDetectedOpType))); + return BSON(DocumentSourceChangeStream::kOperationTypeField + << BSON("$in" << change_stream_legacy::kClassicOperationTypes)); } + } // namespace change_stream_filter } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.h b/src/mongo/db/pipeline/change_stream_filter_helpers.h index 3ef643a9543..1c57c0787ae 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.h +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.h @@ -94,8 +94,7 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch); /** - * Returns the match filter for the classic changestream operationTypes i.e. the operations that - * are NOT guarded behind the 'showExpandedEvents' flag. + * Returns the match filter for the classic changestream operations. */ BSONObj getMatchFilterForClassicOperationTypes(); diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index ea1df1ae1a1..d207ef22714 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -131,10 +131,16 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( const ResumeTokenData& tokenData) { - if (!tokenData.documentKey.missing() && tokenData.uuid) { - std::vector<FieldPath> docKeyFields; - auto docKey = tokenData.documentKey.getDocument(); + if (!tokenData.eventIdentifier.missing() && tokenData.uuid) { + auto docKey = tokenData.eventIdentifier.getDocument(); + + // Newly added events store their operationType and operationDescription as the + // eventIdentifier, not a documentKey. + if (docKey["_id"].missing()) { + return {}; + } + std::vector<FieldPath> docKeyFields; auto iter = docKey.fieldIterator(); while (iter.more()) { auto fieldPair = iter.next(); diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h index 0ac51bab7ac..2c30cef0a91 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h @@ -55,4 +55,21 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( const ResumeTokenData& data); +/** + * Represents the change stream operation types that are NOT guarded behind the 'showExpandedEvents' + * flag. + */ +static const std::set<StringData> kClassicOperationTypes = + std::set<StringData>{DocumentSourceChangeStream::kUpdateOpType, + DocumentSourceChangeStream::kDeleteOpType, + DocumentSourceChangeStream::kReplaceOpType, + DocumentSourceChangeStream::kInsertOpType, + DocumentSourceChangeStream::kDropCollectionOpType, + DocumentSourceChangeStream::kRenameCollectionOpType, + DocumentSourceChangeStream::kDropDatabaseOpType, + DocumentSourceChangeStream::kInvalidateOpType, + DocumentSourceChangeStream::kReshardBeginOpType, + DocumentSourceChangeStream::kReshardDoneCatchUpOpType, + DocumentSourceChangeStream::kNewShardDetectedOpType}; + } // namespace mongo::change_stream_legacy diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp index d080be71902..75ee3ccf1c4 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp @@ -108,6 +108,7 @@ std::unique_ptr<MatchExpression> matchRewriteOperationType( {"update", {{"op", "u"_sd}, {"o._id"_sd, kExistsFalse}}}, {"replace", {{"op", "u"_sd}, {"o._id"_sd, kExistsTrue}}}, {"drop", {{"op", "c"_sd}, {"o.drop"_sd, kExistsTrue}}}, + {"create", {{"op", "c"_sd}, {"o.create"_sd, kExistsTrue}}}, {"rename", {{"op", "c"_sd}, {"o.renameCollection"_sd, kExistsTrue}}}, {"dropDatabase", {{"op", "c"_sd}, {"o.dropDatabase"_sd, kExistsTrue}}}}; @@ -207,6 +208,7 @@ boost::intrusive_ptr<Expression> exprRewriteOperationType( fromjson("{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: 'dropDatabase'}")); opCases.push_back( fromjson("{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: 'rename'}")); + opCases.push_back(fromjson("{case: {$ne: ['$o.create', '$$REMOVE']}, then: 'create'}")); // The default case, if nothing matches. auto defaultCase = ExpressionConstant::create(expCtx.get(), Value())->serialize(false); @@ -1005,6 +1007,12 @@ std::unique_ptr<MatchExpression> matchRewriteNs( tassert(5554104, "Unexpected rewrite failure", dropNsRewrite); cmdCases->add(std::move(dropNsRewrite)); + // The 'create' event is rewritten to the cmdNs in 'ns' and the collection name in 'o.create'. + auto createNsRewrite = matchRewriteGenericNamespace( + expCtx, predicate, "ns"_sd, true /* nsFieldIsCmdNs */, "o.create"_sd); + tassert(6280101, "Unexpected rewrite failure", createNsRewrite); + cmdCases->add(std::move(createNsRewrite)); + // The 'dropDatabase' event is rewritten to the cmdNs in 'ns'. It does not have a collection // field. auto dropDbNsRewrite = @@ -1094,6 +1102,7 @@ boost::intrusive_ptr<Expression> exprRewriteNs( collCases.push_back(fromjson(str::stream() << "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: " << getCollFromNSField("o.renameCollection") << "}")); + collCases.push_back(fromjson("{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}")); // The default case, if nothing matches. auto defaultCase = ExpressionConstant::create(expCtx.get(), Value())->serialize(false); diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 5db609a4627..a3dfc14d0dc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -133,6 +133,10 @@ public: // after the transformation. static constexpr StringData kDocumentKeyField = "documentKey"_sd; + // The name of the field where the operation description of the non-CRUD operations will be + // located. This is complementary to the 'documentKey' for CRUD operations. + static constexpr StringData kOperationDescriptionField = "operationDescription"_sd; + // The name of the field where the pre-image document will be found, if requested and available. static constexpr StringData kFullDocumentBeforeChangeField = "fullDocumentBeforeChange"_sd; @@ -199,10 +203,11 @@ public: // UUID of a collection corresponding to the event (if applicable). static constexpr StringData kCollectionUuidField = "collectionUUID"_sd; - // Object with additional description of operation. - static constexpr StringData kOperationDescriptionField = "operationDescription"_sd; - + // // The different types of operations we can use for the operation type. + // + + // The classic change events. static constexpr StringData kUpdateOpType = "update"_sd; static constexpr StringData kDeleteOpType = "delete"_sd; static constexpr StringData kReplaceOpType = "replace"_sd; @@ -211,11 +216,16 @@ public: static constexpr StringData kRenameCollectionOpType = "rename"_sd; static constexpr StringData kDropDatabaseOpType = "dropDatabase"_sd; static constexpr StringData kInvalidateOpType = "invalidate"_sd; + + // The internal change events that are not exposed to the users. static constexpr StringData kReshardBeginOpType = "reshardBegin"_sd; static constexpr StringData kReshardDoneCatchUpOpType = "reshardDoneCatchUp"_sd; // Internal op type to signal mongos to open cursors on new shards. static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd; + // These events are guarded behind the 'showExpandedEvents' flag. + static constexpr StringData kCreateOpType = "create"_sd; + static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd; static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd; static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index abac48c6ae8..cc3787a44ba 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -109,16 +109,16 @@ DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken( } // If all the fields match exactly, then we have found the token. - if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey == - tokenDataFromClient.documentKey)) { + if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.eventIdentifier == + tokenDataFromClient.eventIdentifier)) { return ResumeStatus::kFoundToken; } - // At this point, we know that the tokens differ only by documentKey. The status we return will - // depend on whether the stream token is logically before or after the client token. If the + // At this point, we know that the tokens differ only by eventIdentifier. The status we return + // will depend on whether the stream token is logically before or after the client token. If the // latter, then we will never see the resume token and the stream cannot be resumed. - return ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey > - tokenDataFromClient.documentKey) + return ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.eventIdentifier > + tokenDataFromClient.eventIdentifier) ? ResumeStatus::kSurpassedToken : ResumeStatus::kCheckNextDoc; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 3914a0c72da..b844b7a615a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" +#include <boost/algorithm/string/join.hpp> #include <boost/intrusive_ptr.hpp> #include <memory> #include <vector> @@ -304,7 +305,7 @@ public: size_t txnOpIndex = 0) { ResumeTokenData tokenData; tokenData.clusterTime = ts; - tokenData.documentKey = docKey; + tokenData.eventIdentifier = docKey; tokenData.fromInvalidate = fromInvalidate; tokenData.txnOpIndex = txnOpIndex; if (!uuid.missing()) @@ -316,7 +317,8 @@ public: * Helper for running an applyOps through the pipeline, and getting all of the results. */ std::vector<Document> getApplyOpsResults(const Document& applyOpsDoc, - const LogicalSessionFromClient& lsid) { + const LogicalSessionFromClient& lsid, + BSONObj spec = kDefaultSpec) { BSONObj applyOpsObj = applyOpsDoc.toBson(); // Create an oplog entry and then glue on an lsid and txnNumber @@ -332,7 +334,7 @@ public: BSONObj oplogEntry = builder.done(); // Create the stages and check that the documents produced matched those in the applyOps. - vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec); + vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, spec); auto transform = stages[3].get(); invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); @@ -1216,6 +1218,7 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) { {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; + Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken( @@ -1228,6 +1231,33 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) { checkTransformation(dropColl, expectedDrop, kShowExpandedEventsSpec, expectedInvalidate); } +TEST_F(ChangeStreamStageTest, TransformCreate) { + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); + + OplogEntry create = + createCommand(BSON("create" << nss.coll() << "idIndex" + << BSON("v" << 2 << "key" << BSON("id" << 1)) << "name" + << "_id_"), + testUuid()); + + const auto expectedOpDescription = fromjson("{idIndex: {v: 2, key: {id: 1}}, name: '_id_'}"); + Document expectedCreate{ + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, + testUuid(), + ResumeTokenData::makeEventIdentifierFromOpDescription( + DSChangeStream::kCreateOpType, Value(expectedOpDescription)))}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kCreateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kCollectionUuidField, testUuid()}, + {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, Value(expectedOpDescription)}, + }; + + checkTransformation(create, expectedCreate, kShowExpandedEventsSpec); +} + TEST_F(ChangeStreamStageTest, TransformRename) { NamespaceString otherColl("test.bar"); OplogEntry rename = @@ -1265,17 +1295,17 @@ TEST_F(ChangeStreamStageTest, TransformRenameShowExpandedEvents) { Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {DSChangeStream::kOperationDescriptionField, - D{ - {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {"dropTarget", dropTarget}, - }}, {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, + D{ + {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, + {"dropTarget", dropTarget}, + }}, }; Document expectedInvalidate{ {DSChangeStream::kIdField, @@ -2250,6 +2280,60 @@ TEST_F(ChangeStreamStageTest, TransformApplyOps) { // The third document is skipped. } +TEST_F(ChangeStreamStageTest, TransformApplyOpsWithCreateOperation) { + RAIIServerParameterControllerForTest controller("featureFlagChangeStreamsVisibility", true); + + // Doesn't use the checkTransformation() pattern that other tests use since we expect multiple + // documents to be returned from one applyOps. + + Document idIndexDef = Document{{"v", 2}, {"key", D{{"_id", 1}}}}; + Document applyOpsDoc{ + {"applyOps", + Value{std::vector<Document>{ + Document{{"op", "c"_sd}, + {"ns", nss.db() + ".$cmd"}, + {"ui", testUuid()}, + {"o", Value{Document{{"create", nss.coll()}, {"idIndex", idIndexDef}}}}, + {"ts", Timestamp(0, 1)}}, + Document{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}, + Document{ + {"op", "c"_sd}, + {"ns", nss.db() + ".$cmd"}, + {"ui", UUID::gen()}, + // Operation on another collection which should be skipped. + {"o", Value{Document{{"create", "otherCollection"_sd}, {"idIndex", idIndexDef}}}}}, + }}}, + }; + LogicalSessionFromClient lsid = testLsid(); + vector<Document> results = getApplyOpsResults(applyOpsDoc, lsid, kShowExpandedEventsSpec); + + // The create operation should be skipped. + ASSERT_EQ(results.size(), 2u); + + // Check that the first document is correct. + auto nextDoc = results[0]; + ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kCreateOpType); + ASSERT_VALUE_EQ(nextDoc[DSChangeStream::kOperationDescriptionField], + Value(Document{{"idIndex", idIndexDef}})); + ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0); + + // Check the second document. + nextDoc = results[1]; + ASSERT_EQ(nextDoc["txnNumber"].getLong(), 0LL); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["x"].getString(), "hallo"); + ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0); + + // The third document is skipped. +} + TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { const Timestamp ts(3, 45); const long long term = 4; @@ -2310,10 +2394,9 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { checkTransformation(rename, expectedRename); } -TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) { - auto collSpec = - D{{"create", "foo"_sd}, - {"idIndex", D{{"v", 2}, {"key", D{{"_id", 1}}}, {"name", "_id_"_sd}, {"ns", nss.ns()}}}}; +TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollectionWhenShowExpandedEventsOff) { + auto collSpec = D{{"create", "foo"_sd}, + {"idIndex", D{{"v", 2}, {"key", D{{"_id", 1}}}, {"name", "_id_"_sd}}}}; OplogEntry createColl = createCommand(collSpec.toBson(), testUuid()); checkTransformation(createColl, boost::none); } @@ -4371,7 +4454,7 @@ public: } std::string getNsCollRegexMatchExpr(const std::string& field, const std::string& regex) { - if (field == "$o.drop") { + if (field == "$o.drop" || field == "$o.create") { return str::stream() << "{$expr: {$let: {vars: {oplogField: {$cond: [{ $eq: [{ $type: ['" << field << "']}, {$const: 'string'}]}, '" << field @@ -4749,6 +4832,20 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeDrop) { fromjson("{$and: [{op: {$eq: 'c'}}, {'o.drop': {$exists: true}}]}")); } +TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeCreate) { + auto spec = fromjson("{operationType: 'create'}"); + auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); + ASSERT_OK(statusWithMatchExpression.getStatus()); + + auto rewrittenMatchExpression = change_stream_rewrite::rewriteFilterForFields( + getExpCtx(), statusWithMatchExpression.getValue().get(), {"operationType"}); + ASSERT(rewrittenMatchExpression); + + auto rewrittenPredicate = rewrittenMatchExpression->serialize(); + ASSERT_BSONOBJ_EQ(rewrittenPredicate, + fromjson("{$and: [{op: {$eq: 'c'}}, {'o.create': {$exists: true}}]}")); +} + TEST_F(ChangeStreamRewriteTest, CanRewriteEqPredicateOnOperationTypeRename) { auto spec = fromjson("{operationType: 'rename'}"); auto statusWithMatchExpression = MatchExpressionParser::parse(spec, getExpCtx()); @@ -4816,6 +4913,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteEqNullPredicateOnOperationTypeSubField TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) { auto expr = BSON("operationType" << BSON("$in" << BSON_ARRAY("drop" + << "create" << "insert"))); auto statusWithMatchExpression = MatchExpressionParser::parse(expr, getExpCtx()); ASSERT_OK(statusWithMatchExpression.getStatus()); @@ -4826,7 +4924,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteInPredicateOnOperationType) { auto rewrittenPredicate = rewrittenMatchExpression->serialize(); ASSERT_BSONOBJ_EQ(rewrittenPredicate, - BSON(OR(fromjson("{$and: [{op: {$eq: 'c'}}, {'o.drop': {$exists: true}}]}"), + BSON(OR(fromjson("{$and: [{op: {$eq: 'c'}}, {'o.create': {$exists: true}}]}"), + fromjson("{$and: [{op: {$eq: 'c'}}, {'o.drop': {$exists: true}}]}"), fromjson("{op: {$eq: 'i'}}")))); } @@ -4963,7 +5062,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteExprWithOperationType) { case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: {$const: 'dropDatabase'} }, - {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}} + {case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$const: 'rename'}}, + {case: {$ne: ['$o.create', '$$REMOVE']}, then: {$const: 'create'}} ], default: '$$REMOVE' } @@ -5434,6 +5534,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteFullNamespaceObject) { BSON(OR(BSON("o.renameCollection" << BSON("$eq" << ns)), BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)), BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())))), + BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)), + BSON("o.create" << BSON("$eq" << expCtx->ns.coll())))), BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5453,8 +5555,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithSwappedField) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5476,8 +5579,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyDbField) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1}"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5497,8 +5601,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithOnlyCollectionField rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5518,8 +5623,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidDbField) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5539,8 +5645,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithInvalidCollField) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5560,8 +5667,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceObjectWithExtraField) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5587,7 +5695,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithStringDbFieldPath) { BSON("ns" << BSON("$regex" << regexNs)))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)), - BSON("ns" << BSON("$eq" << cmdNs)), + BSON("ns" << BSON("$eq" << cmdNs)), // drop. + BSON("ns" << BSON("$eq" << cmdNs)), // create. BSON(AND(BSON("ns" << BSON("$eq" << cmdNs)), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5613,6 +5722,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithCollectionFieldPath) { BSON(AND(fromjson("{op: {$eq: 'c'}}"), BSON(OR(BSON("o.renameCollection" << BSON("$regex" << regexNs)), BSON("o.drop" << BSON("$eq" << expCtx->ns.coll())), + BSON("o.create" << BSON("$eq" << expCtx->ns.coll())), BSON(AND(fromjson("{$alwaysFalse: 1}"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5635,7 +5745,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexDbFieldPath) { fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^unit.*$)")), - fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), // drop. + fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), // create. BSON(AND(fromjson(getNsDbRegexMatchExpr("$ns", R"(^unit.*$)")), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5659,6 +5770,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithRegexCollectionFieldPath) BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^pipeline.*$)")), fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^pipeline.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.create", R"(^pipeline.*$)")), BSON(AND(fromjson("{ $alwaysFalse: 1 }"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5699,8 +5811,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraDbFieldPath) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop + fromjson("{$alwaysFalse: 1 }"), // rename + fromjson("{$alwaysFalse: 1 }"), // create BSON(AND(fromjson("{$alwaysFalse: 1 }"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5721,8 +5834,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithExtraCollectionFieldPath) rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop + fromjson("{$alwaysFalse: 1 }"), // rename + fromjson("{$alwaysFalse: 1 }"), // create BSON(AND(fromjson("{$alwaysFalse: 1 }"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5743,8 +5857,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInvalidFieldPath) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // drop + fromjson("{$alwaysFalse: 1 }"), // rename + fromjson("{$alwaysFalse: 1 }"), // create BSON(AND(fromjson("{$alwaysFalse: 1 }"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5776,7 +5891,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDb) { fromjson("{op: {$eq: 'c'}}"), BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)), BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)))), - BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), + BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), // drop. + BSON("ns" << BSON("$eq" << secondCmdNs)))), + BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), // create. BSON("ns" << BSON("$eq" << secondCmdNs)))), BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << firstCmdNs)), BSON("ns" << BSON("$eq" << secondCmdNs)))), @@ -5811,7 +5928,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDb) { BSON(AND(fromjson("{op: {$eq: 'c'}}"), BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), BSON("o.renameCollection" << BSON("$regex" << firstRegexNs)))), - BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), // drop. + BSON("ns" << BSON("$eq" << firstCmdNs)))), + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), // create. BSON("ns" << BSON("$eq" << firstCmdNs)))), BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), BSON("ns" << BSON("$eq" << firstCmdNs)))), @@ -5847,6 +5966,10 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollection) << "news")), BSON("o.drop" << BSON("$eq" << "test")))), + BSON(OR(BSON("o.create" << BSON("$eq" + << "news")), + BSON("o.create" << BSON("$eq" + << "test")))), BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); @@ -5882,6 +6005,10 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnCollection << "news")), BSON("o.drop" << BSON("$eq" << "test")))), + BSON(OR(BSON("o.create" << BSON("$eq" + << "news")), + BSON("o.create" << BSON("$eq" + << "test")))), BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); @@ -5909,7 +6036,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnDb) { BSON(OR( BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")), fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))), - BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // drop. + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // create. fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), @@ -5938,7 +6067,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnDb) { BSON(OR( BSON(OR(fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")), fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^news$)")))), - BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // drop. + fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), + BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), // create. fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), BSON(AND(BSON(OR(fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")), fromjson(getNsDbRegexMatchExpr("$ns", R"(^news$)")))), @@ -5969,6 +6100,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInRegexExpressionOnCollec fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))), BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")), fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.create", R"(^news$)")))), BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -5997,6 +6130,8 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinRegexExpressionOnColle fromjson(getNsCollRegexMatchExpr("$o.renameCollection", R"(^news$)")))), BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")), fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^news$)")))), + BSON(OR(fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")), + fromjson(getNsCollRegexMatchExpr("$o.create", R"(^news$)")))), BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); } @@ -6028,7 +6163,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnDbWithRegex BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))), BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), - fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // drop. + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // create. BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); @@ -6062,7 +6199,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithNinExpressionOnDbWithRege BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), fromjson(getNsDbRegexMatchExpr("$o.renameCollection", R"(^test.*$)")))), BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), - fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // drop. + BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), + fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), // create. BSON(AND(BSON(OR(BSON("ns" << BSON("$eq" << secondCmdNs)), fromjson(getNsDbRegexMatchExpr("$ns", R"(^test.*$)")))), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); @@ -6085,20 +6224,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithInExpressionOnCollectionW auto rewrittenPredicate = rewrittenMatchExpression->serialize(); ASSERT_BSONOBJ_EQ( rewrittenPredicate, - BSON(OR( - BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), - BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), - fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))), - BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), - fromjson(getNsCollRegexMatchExpr("$o.renameCollection", - R"(^test.*$)")))), - BSON(OR(BSON("o.drop" << BSON("$eq" - << "news")), - fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))), - BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), - fromjson("{$alwaysFalse: 1}"))), - fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); + BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), + BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))), + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$o.renameCollection", + R"(^test.*$)")))), + BSON(OR(BSON("o.drop" << BSON("$eq" + << "news")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))), + BSON(OR(BSON("o.create" << BSON("$eq" + << "news")), + fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")))), + BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), + fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } TEST_F(ChangeStreamRewriteTest, @@ -6123,16 +6265,20 @@ TEST_F(ChangeStreamRewriteTest, BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), BSON(OR(BSON("ns" << BSON("$regex" << secondRegexNs)), fromjson(getNsCollRegexMatchExpr("$ns", R"(^test.*$)")))))), - BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), - fromjson(getNsCollRegexMatchExpr("$o.renameCollection", - R"(^test.*$)")))), - BSON(OR(BSON("o.drop" << BSON("$eq" - << "news")), - fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))), - BSON(AND(BSON(OR(fromjson("{$alwaysFalse: 1}"), - fromjson("{$alwaysFalse: 1}"))), - fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); + BSON(AND( + fromjson("{op: {$eq: 'c'}}"), + BSON(OR(BSON(OR(BSON("o.renameCollection" << BSON("$regex" << secondRegexNs)), + fromjson(getNsCollRegexMatchExpr("$o.renameCollection", + R"(^test.*$)")))), + BSON(OR(BSON("o.drop" << BSON("$eq" + << "news")), + fromjson(getNsCollRegexMatchExpr("$o.drop", R"(^test.*$)")))), + BSON(OR(BSON("o.create" << BSON("$eq" + << "news")), + fromjson(getNsCollRegexMatchExpr("$o.create", R"(^test.*$)")))), + BSON(AND( + BSON(OR(fromjson("{$alwaysFalse: 1}"), fromjson("{$alwaysFalse: 1}"))), + fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); } TEST_F(ChangeStreamRewriteTest, CannotRewriteNamespaceWithInExpressionOnInvalidDb) { @@ -6194,8 +6340,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyInExpression) { rewrittenPredicate, BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1 }"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))); } @@ -6216,8 +6363,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNamespaceWithEmptyNinExpression) { BSON(NOR( BSON(OR(BSON(AND(fromjson("{op: {$not: {$eq: 'c'}}}"), fromjson("{$alwaysFalse: 1 }"))), BSON(AND(fromjson("{op: {$eq: 'c'}}"), - BSON(OR(fromjson("{$alwaysFalse: 1 }"), - fromjson("{$alwaysFalse: 1 }"), + BSON(OR(fromjson("{$alwaysFalse: 1 }"), // rename. + fromjson("{$alwaysFalse: 1 }"), // drop. + fromjson("{$alwaysFalse: 1 }"), // create. BSON(AND(fromjson("{$alwaysFalse: 1 }"), fromjson("{'o.dropDatabase': {$eq: 1}}")))))))))))); } @@ -6233,17 +6381,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObject) { expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); ASSERT(rewrittenMatchExpression); - auto case1 = + auto caseCRUD = "{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: " "{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: " - "-1}]}}"; - auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}"; - auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"; - auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"; - auto case5 = + "-1}]}}"s; + auto caseNotCmd = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}"; + auto caseDrop = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"; + auto caseDropDatabase = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"; + auto caseRenameCollection = "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: " "['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: " "-1}]}}"; + auto caseCreate = "{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}"; + + const auto cases = boost::algorithm::join( + std::vector<std::string>{ + caseCRUD, caseNotCmd, caseDrop, caseDropDatabase, caseRenameCollection, caseCreate}, + ","); auto expectedExpr = fromjson( "{" @@ -6260,7 +6414,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObject) { " coll: {" " $switch: {" " branches: ["s + - " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 + + cases + " ], default: '$$REMOVE'}}}}}, " " {db: {$const: 'unittests' }, coll: {$const: 'pipeline_test'}}]}" "}"); @@ -6279,17 +6433,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObjectWithOnlyDb) { expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); ASSERT(rewrittenMatchExpression); - auto case1 = + auto caseCRUD = "{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: " "{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: " - "-1}]}}"; - auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}"; - auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"; - auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"; - auto case5 = + "-1}]}}"s; + auto caseNotCmd = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}"; + auto caseDrop = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"; + auto caseDropDatabase = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"; + auto caseRenameCollection = "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: " "['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: " "-1}]}}"; + auto caseCreate = "{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}"; + + const auto cases = boost::algorithm::join( + std::vector<std::string>{ + caseCRUD, caseNotCmd, caseDrop, caseDropDatabase, caseRenameCollection, caseCreate}, + ","); auto expectedExpr = fromjson( "{" @@ -6306,7 +6466,7 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnFullObjectWithOnlyDb) { " coll: {" " $switch: {" " branches: ["s + - " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 + + cases + " ], default: '$$REMOVE'}}}}}, " " {db: {$const: 'unittests' }}]}" "}"); @@ -6353,17 +6513,23 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnCollFieldPath) { expCtx, statusWithMatchExpression.getValue().get(), {"ns"}); ASSERT(rewrittenMatchExpression); - auto case1 = + auto caseCRUD = "{case: {$in: ['$op', [{$const: 'i' }, {$const: 'u' }, {$const: 'd'}]]}, then: " "{$substrBytes: ['$ns', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: " - "-1}]}}"; - auto case2 = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}"; - auto case3 = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"; - auto case4 = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"; - auto case5 = + "-1}]}}"s; + auto caseNotCmd = "{case: { $ne: ['$op', {$const: 'c'}]}, then: '$$REMOVE'}"; + auto caseDrop = "{case: {$ne: ['$o.drop', '$$REMOVE']}, then: '$o.drop'}"; + auto caseDropDatabase = "{case: {$ne: ['$o.dropDatabase', '$$REMOVE']}, then: '$$REMOVE'}"; + auto caseRenameCollection = "{case: {$ne: ['$o.renameCollection', '$$REMOVE']}, then: {$substrBytes: " "['$o.renameCollection', {$add: [{$strLenBytes: ['$$dbName']}, {$const: 1}]}, {$const: " "-1}]}}"; + auto caseCreate = "{case: {$ne: ['$o.create', '$$REMOVE']}, then: '$o.create'}"; + + const auto cases = boost::algorithm::join( + std::vector<std::string>{ + caseCRUD, caseNotCmd, caseDrop, caseDropDatabase, caseRenameCollection, caseCreate}, + ","); auto expectedExpr = fromjson( "{" @@ -6378,9 +6544,9 @@ TEST_F(ChangeStreamRewriteTest, CanRewriteNsWithExprOnCollFieldPath) { " in: {" " $switch: {" " branches: ["s + - " " + case1 + ", " + case2 + ", " + case3 + ", " + case4 + ", " + case5 + + cases + " ], default: '$$REMOVE'}}}}, " - " {$const: 'pipeline_test'}]}" + " {$const: 'pipeline_test'}]}" "}"); auto rewrittenPredicate = rewrittenMatchExpression->serialize(); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 7f90cfb5074..1f2fc8184cb 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -35,6 +35,7 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/pipeline/change_stream_document_diff_parser.h" +#include "mongo/db/pipeline/change_stream_filter_helpers.h" #include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/pipeline/document_path_support.h" @@ -64,6 +65,34 @@ using std::vector; namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; + +Document copyDocExceptFields(const Document& source, const std::set<StringData>& fieldNames) { + MutableDocument doc(source); + for (auto fieldName : fieldNames) { + doc.remove(fieldName); + } + return doc.freeze(); +} + +ResumeTokenData makeResumeToken(Value ts, + Value txnOpIndex, + Value uuid, + StringData operationType, + Value documentKey, + Value opDescription) { + ResumeTokenData resumeTokenData; + resumeTokenData.clusterTime = ts.getTimestamp(); + if (!uuid.missing()) { + resumeTokenData.uuid = uuid.getUuid(); + } + if (!txnOpIndex.missing()) { + resumeTokenData.txnOpIndex = txnOpIndex.getLong(); + } + resumeTokenData.eventIdentifier = + ResumeToken::makeEventIdentifier(operationType, documentKey, opDescription); + + return resumeTokenData; +} } // namespace REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamTransform, @@ -134,26 +163,6 @@ StageConstraints DocumentSourceChangeStreamTransform::constraints( return constraints; } -ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts, - Value uuid, - Value documentKey, - Value txnOpIndex) { - ResumeTokenData resumeTokenData; - - resumeTokenData.clusterTime = ts.getTimestamp(); - resumeTokenData.documentKey = documentKey; - - if (!uuid.missing()) { - resumeTokenData.uuid = uuid.getUuid(); - } - - if (!txnOpIndex.missing()) { - resumeTokenData.txnOpIndex = txnOpIndex.getLong(); - } - - return resumeTokenData; -} - Document DocumentSourceChangeStreamTransform::applyTransformation(const Document& input) { // If we're executing a change stream pipeline that was forwarded from mongos, then we expect it // to "need merge"---we expect to be executing the shards part of a split pipeline. It is never @@ -193,6 +202,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document Value fullDocument; Value updateDescription; Value documentKey; + Value operationDescription; switch (opType) { case repl::OpTypeEnum::kInsert: { @@ -289,20 +299,20 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document break; } case repl::OpTypeEnum::kCommand: { - if (!input.getNestedField("o.drop").missing()) { + const auto oField = input[repl::OplogEntry::kObjectFieldName].getDocument(); + if (auto nssField = oField.getField("drop"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kDropCollectionOpType; // The "o.drop" field will contain the actual collection name. - nss = NamespaceString(nss.db(), input.getNestedField("o.drop").getString()); - } else if (!input.getNestedField("o.renameCollection").missing()) { + nss = NamespaceString(nss.db(), nssField.getString()); + } else if (auto nssField = oField.getField("renameCollection"); !nssField.missing()) { operationType = DocumentSourceChangeStream::kRenameCollectionOpType; // The "o.renameCollection" field contains the namespace of the original collection. - nss = NamespaceString(input.getNestedField("o.renameCollection").getString()); + nss = NamespaceString(nssField.getString()); - // The "o.to" field contains the target namespace for the rename. - const auto renameTargetNss = - NamespaceString(input.getNestedField("o.to").getString()); + // The "to" field contains the target namespace for the rename. + const auto renameTargetNss = NamespaceString(oField["to"].getString()); const Value renameTarget(Document{ {"db", renameTargetNss.db()}, {"coll", renameTargetNss.coll()}, @@ -315,27 +325,22 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // If 'showExpandedEvents' is set, include full details of the rename in // 'operationDescription'. if (_changeStreamSpec.getShowExpandedEvents()) { - MutableDocument operationDescription; - operationDescription.addField(DocumentSourceChangeStream::kRenameTargetNssField, - renameTarget); - - // If present, 'dropTarget' is the UUID of the collection that previously owned - // the target namespace and was dropped during the rename operation. - const auto dropTarget = input.getNestedField("o.dropTarget"); - if (!dropTarget.missing()) { - checkValueType(dropTarget, "o.dropTarget", BSONType::BinData); - operationDescription.addField("dropTarget", dropTarget); - } - - doc.addField(DocumentSourceChangeStream::kOperationDescriptionField, - operationDescription.freezeToValue()); + MutableDocument opDescBuilder( + copyDocExceptFields(oField, {"renameCollection"_sd, "stayTemp"_sd})); + opDescBuilder.setField(DocumentSourceChangeStream::kRenameTargetNssField, + renameTarget); + operationDescription = opDescBuilder.freezeToValue(); } - } else if (!input.getNestedField("o.dropDatabase").missing()) { + } else if (!oField.getField("dropDatabase").missing()) { operationType = DocumentSourceChangeStream::kDropDatabaseOpType; // Extract the database name from the namespace field and leave the collection name // empty. nss = NamespaceString(nss.db()); + } else if (auto nssField = oField.getField("create"); !nssField.missing()) { + operationType = DocumentSourceChangeStream::kCreateOpType; + nss = NamespaceString(nss.db(), nssField.getString()); + operationDescription = Value(copyDocExceptFields(oField, {"create"_sd})); } else { // All other commands will invalidate the stream. operationType = DocumentSourceChangeStream::kInvalidateOpType; @@ -402,7 +407,8 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document } // Generate the resume token. Note that only 'ts' is always guaranteed to be present. - auto resumeTokenData = getResumeToken(ts, uuid, documentKey, txnOpIndex); + auto resumeTokenData = + makeResumeToken(ts, txnOpIndex, uuid, operationType, documentKey, operationDescription); auto resumeToken = ResumeToken(resumeTokenData).toDocument(); doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken)); @@ -463,7 +469,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document operationType == DocumentSourceChangeStream::kDropDatabaseOpType ? Value(Document{{"db", nss.db()}}) : Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); + + // The event may have a documentKey OR an operationDescription, but not both. We already + // validated this while creating the resume token. doc.addField(DocumentSourceChangeStream::kDocumentKeyField, std::move(documentKey)); + if (_changeStreamSpec.getShowExpandedEvents()) { + doc.addField(DocumentSourceChangeStream::kOperationDescriptionField, operationDescription); + } // Note that the update description field might be the 'missing' value, in which case it will // not be serialized. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index d9dcede8598..67bcdcdeecb 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -73,11 +73,6 @@ private: DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec); - /** - * Helper used for determining what resume token to return. - */ - ResumeTokenData getResumeToken(Value ts, Value uuid, Value documentKey, Value txnOpIndex); - DocumentSourceChangeStreamSpec _changeStreamSpec; // Records the documentKey fields from the client's resume token, if present. diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index 5f91cab0d6a..40603a0b1d7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -45,12 +45,43 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamUnwindTransaction, DocumentSourceChangeStreamUnwindTransaction::createFromBson, true); +namespace change_stream_filter { +/** + * Build a filter, similar to the optimized oplog filter, designed to reject individual transaction + * entries that we know would eventually get rejected by the 'userMatch' filter if they continued + * through the rest of the pipeline. We must also adjust the filter slightly for user rewrites, as + * events within a transaction do not have certain fields that are common to other oplog entries. + * + * NB: The new filter may contain references to strings in the BSONObj that 'userMatch' originated + * from. Callers that keep the new filter long-term should serialize and re-parse it to guard + * against the possibility of stale string references. + */ +std::unique_ptr<MatchExpression> buildUnwindTransactionFilter( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) { + // The transaction unwind filter is the same as the operation filter applied to the oplog. This + // includes a namespace filter, which ensures that it will discard all documents that would be + // filtered out by the default 'ns' filter this stage gets initialized with. + auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr)); + + // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do + // this separately because we need to exclude certain fields from the user's filters. Unwound + // transaction events do not have these fields until we populate them from the commitTransaction + // event. We already applied these predicates during the oplog scan, so we know that they match. + static const std::set<std::string> excludedFields = {"clusterTime", "lsid", "txnNumber"}; + if (auto rewrittenMatch = + change_stream_rewrite::rewriteFilterForFields(expCtx, userMatch, {}, excludedFields)) { + unwindFilter->add(std::move(rewrittenMatch)); + } + return MatchExpression::optimize(std::move(unwindFilter)); +} +} // namespace change_stream_filter + boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction> DocumentSourceChangeStreamUnwindTransaction::create( const boost::intrusive_ptr<ExpressionContext>& expCtx) { - auto filter = - BSON("ns" << BSONRegEx(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns))); - return new DocumentSourceChangeStreamUnwindTransaction(std::move(filter), expCtx); + std::unique_ptr<MatchExpression> matchExpr = + change_stream_filter::buildUnwindTransactionFilter(expCtx, nullptr); + return new DocumentSourceChangeStreamUnwindTransaction(matchExpr->serialize(), expCtx); } boost::intrusive_ptr<DocumentSourceChangeStreamUnwindTransaction> @@ -394,37 +425,6 @@ void DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator:: } } -namespace change_stream_filter { -/** - * Build a filter, similar to the optimized oplog filter, designed to reject individual transaction - * entries that we know would eventually get rejected by the 'userMatch' filter if they continued - * through the rest of the pipeline. We must also adjust the filter slightly for user rewrites, as - * events within a transaction do not have certain fields that are common to other oplog entries. - * - * NB: The new filter may contain references to strings in the BSONObj that 'userMatch' originated - * from. Callers that keep the new filter long-term should serialize and re-parse it to guard - * against the possibility of stale string references. - */ -std::unique_ptr<MatchExpression> buildUnwindTransactionFilter( - const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) { - // The transaction unwind filter is the same as the operation filter applied to the oplog. This - // includes a namespace filter, which ensures that it will discard all documents that would be - // filtered out by the default 'ns' filter this stage gets initialized with. - auto unwindFilter = std::make_unique<AndMatchExpression>(buildOperationFilter(expCtx, nullptr)); - - // Attempt to rewrite the user's filter and combine it with the standard operation filter. We do - // this separately because we need to exclude certain fields from the user's filters. Unwound - // transaction events do not have these fields until we populate them from the commitTransaction - // event. We already applied these predicates during the oplog scan, so we know that they match. - static const std::set<std::string> excludedFields = {"clusterTime", "lsid", "txnNumber"}; - if (auto rewrittenMatch = - change_stream_rewrite::rewriteFilterForFields(expCtx, userMatch, {}, excludedFields)) { - unwindFilter->add(std::move(rewrittenMatch)); - } - return MatchExpression::optimize(std::move(unwindFilter)); -} -} // namespace change_stream_filter - Pipeline::SourceContainer::iterator DocumentSourceChangeStreamUnwindTransaction::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { tassert(5687205, "Iterator mismatch during optimization", *itr == this); @@ -441,12 +441,6 @@ Pipeline::SourceContainer::iterator DocumentSourceChangeStreamUnwindTransaction: return nextChangeStreamStageItr; } - // We never expect to apply these optimizations more than once. The filter should be in its - // default state. - tassert(5902200, - "Multiple optimizations attempted", - _filter.nFields() == 1 && _filter.firstElementFieldNameStringData() == "ns"); - // Seek to the stage that immediately follows the change streams stages. itr = std::find_if_not(itr, container->end(), [](const auto& stage) { return stage->constraints().isChangeStreamStage(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 7c025a92766..afa2f70a422 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -535,7 +535,7 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfTokenHasSubsetOfDocumen TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) { // Verify that a resume token whose documentKey is not a valid object will neither succeed nor - // cause an invariant when we perform the relaxed documentKey._id check when running in a + // cause an invariant when we perform the relaxed eventIdentifier._id check when running in a // sharded context. Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; @@ -551,8 +551,8 @@ TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyIsNonObject) TEST_F(CheckResumeTokenTest, ShardedResumeFailsOnMongosIfDocumentKeyOmitsId) { // Verify that a resume token whose documentKey omits the _id field will neither succeed nor - // cause an invariant when we perform the relaxed documentKey._id, even when compared against an - // artificial stream token whose _id is also missing. + // cause an invariant when we perform the relaxed eventIdentifier._id, even when compared + // against an artificial stream token whose _id is also missing. Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; @@ -607,7 +607,7 @@ TEST_F(CheckResumeTokenTest, ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, resumeTimestamp); - ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey); + ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.eventIdentifier.getDocument(), expectedDocKey); } TEST_F(CheckResumeTokenTest, @@ -674,7 +674,7 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, firstEventAfter); - ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey); + ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.eventIdentifier.getDocument(), expectedDocKey); } TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInvalidate) { @@ -750,7 +750,7 @@ TEST_F(CheckResumeTokenTest, ShouldSkipResumeTokensWithEarlierTxnOpIndex) { ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, resumeTimestamp); - ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), expectedDocKey); + ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.eventIdentifier.getDocument(), expectedDocKey); } /** diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index 5d3a5e78d4c..cc612288fa9 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -37,6 +37,7 @@ #include "mongo/bson/bsonmisc.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/exec/document_value/value_comparator.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/storage/key_string.h" #include "mongo/util/hex.h" @@ -61,7 +62,7 @@ bool ResumeTokenData::operator==(const ResumeTokenData& other) const { return clusterTime == other.clusterTime && version == other.version && tokenType == other.tokenType && txnOpIndex == other.txnOpIndex && fromInvalidate == other.fromInvalidate && uuid == other.uuid && - (Value::compare(this->documentKey, other.documentKey, nullptr) == 0); + (Value::compare(this->eventIdentifier, other.eventIdentifier, nullptr) == 0); } std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { @@ -75,7 +76,7 @@ std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData) { out << ", fromInvalidate: " << static_cast<bool>(tokenData.fromInvalidate); } out << ", uuid: " << tokenData.uuid; - out << ", documentKey: " << tokenData.documentKey << "}"; + out << ", eventIdentifier: " << tokenData.eventIdentifier << "}"; return out; } @@ -96,7 +97,7 @@ ResumeToken::ResumeToken(const Document& resumeDoc) { } // We encode the resume token as a KeyString with the sequence: -// clusterTime, version, txnOpIndex, fromInvalidate, uuid, documentKey Only the clusterTime, +// clusterTime, version, txnOpIndex, fromInvalidate, uuid, eventIdentifier Only the clusterTime, // version, txnOpIndex, and fromInvalidate are required. ResumeToken::ResumeToken(const ResumeTokenData& data) { BSONObjBuilder builder; @@ -110,13 +111,13 @@ ResumeToken::ResumeToken(const ResumeTokenData& data) { builder.appendBool("", data.fromInvalidate); } uassert(50788, - "Unexpected resume token with a documentKey but no UUID", - data.uuid || data.documentKey.missing()); + "Unexpected resume token with a eventIdentifier but no UUID", + data.uuid || data.eventIdentifier.missing()); if (data.uuid) { data.uuid->appendToBuilder(&builder, ""); } - data.documentKey.addToBsonObj(&builder, ""); + data.eventIdentifier.addToBsonObj(&builder, ""); auto keyObj = builder.obj(); KeyString::Builder encodedToken(KeyString::Version::V1, keyObj, Ordering::make(BSONObj())); _hexKeyString = hexblob::encode(encodedToken.getBuffer(), encodedToken.getSize()); @@ -130,9 +131,10 @@ bool ResumeToken::operator==(const ResumeToken& other) const { // '_hexKeyString' is enough to determine equality. The type bits are used to unambiguously // re-construct the original data, but we do not expect any two resume tokens to have the same // data and different type bits, since that would imply they have (1) the same timestamp and (2) - // the same documentKey (possibly different types). This should not be possible because - // documents with the same documentKey should be on the same shard and therefore should have - // different timestamps. + // the same eventIdentifier fields and values, but with different types. Change events with the + // same eventIdentifier are either (1) on the same shard in the case of CRUD events, which + // implies that they must have different timestamps; or (2) refer to the same logical event on + // different shards, in the case of non-CRUD events. return _hexKeyString == other._hexKeyString; } @@ -208,15 +210,15 @@ ResumeTokenData ResumeToken::getData() const { result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean()); } - // The UUID and documentKey are not required. + // The UUID and eventIdentifier are not required. if (!i.more()) { return result; } - // The UUID comes first, then the documentKey. + // The UUID comes first, then the eventIdentifier. result.uuid = uassertStatusOK(UUID::parse(i.next())); if (i.more()) { - result.documentKey = Value(i.next()); + result.eventIdentifier = Value(i.next()); } uassert(40646, "invalid oversized resume token", !i.more()); @@ -239,4 +241,19 @@ bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) { return tokenData == makeHighWaterMarkResumeTokenData(tokenData.clusterTime, tokenData.uuid); } +Value ResumeToken::makeEventIdentifier(StringData opType, Value documentKey, Value opDescription) { + tassert(6280100, + "both documentKey and operationDescription cannot be present for an event", + documentKey.missing() || opDescription.missing()); + + // For classic change events, the eventIdentifier is always the documentKey, even if missing. + if (change_stream_legacy::kClassicOperationTypes.count(opType)) { + return documentKey; + } + + // For an expanded event, the eventIdentifier is its operation type and description. + return Value( + Document{{"operationType"_sd, opType}, {"operationDescription"_sd, opDescription}}); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index b5a7bddb218..99c018ab20a 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -62,18 +62,23 @@ struct ResumeTokenData { int versionIn, size_t txnOpIndexIn, const boost::optional<UUID>& uuidIn, - Value documentKeyIn) + Value eventIdentifierIn) : clusterTime(clusterTimeIn), version(versionIn), txnOpIndex(txnOpIndexIn), uuid(uuidIn), - documentKey(std::move(documentKeyIn)){}; + eventIdentifier(std::move(eventIdentifierIn)){}; bool operator==(const ResumeTokenData& other) const; bool operator!=(const ResumeTokenData& other) const { return !(*this == other); } + static Value makeEventIdentifierFromOpDescription(StringData opType, Value opDescription) { + return Value( + Document{{"operationType"_sd, opType}, {"operationDescription"_sd, opDescription}}); + } + Timestamp clusterTime; int version = 1; TokenType tokenType = TokenType::kEventToken; @@ -87,7 +92,10 @@ struct ResumeTokenData { // notification itself. FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate; boost::optional<UUID> uuid; - Value documentKey; + + // The eventIdentifier can be either be a document key for CRUD operations, or a more + // descriptive operation details for non-CRUD operations. + Value eventIdentifier; }; std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData); @@ -121,6 +129,11 @@ public: static ResumeToken parse(const Document& document); /** + * Generates an appropriate event identifier for the given operationType. + */ + static Value makeEventIdentifier(StringData opType, Value documentKey, Value opDescription); + + /** * Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey. */ static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime); diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index 88be9cdc940..7e893cf2899 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -47,9 +47,10 @@ namespace { TEST(ResumeToken, EncodesFullTokenFromData) { Timestamp ts(1000, 2); UUID testUuid = UUID::gen(); - Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + Document eventIdentifier{{"_id"_sd, "stuff"_sd}, + {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(documentKey)); + ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(eventIdentifier)); ResumeToken token(resumeTokenDataIn); ResumeTokenData tokenData = token.getData(); ASSERT_EQ(resumeTokenDataIn, tokenData); @@ -68,9 +69,10 @@ TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) { TEST(ResumeToken, ShouldRoundTripThroughHexEncoding) { Timestamp ts(1000, 2); UUID testUuid = UUID::gen(); - Document documentKey{{"_id"_sd, "stuff"_sd}, {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; + Document eventIdentifier{{"_id"_sd, "stuff"_sd}, + {"otherkey"_sd, Document{{"otherstuff"_sd, 2}}}}; - ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(documentKey)); + ResumeTokenData resumeTokenDataIn(ts, 0, 0, testUuid, Value(eventIdentifier)); // Test serialization/parsing through Document. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument()); @@ -100,6 +102,27 @@ TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) { ASSERT_EQ(resumeTokenDataIn, tokenData); } +TEST(ResumeToken, NonDocumentKeyResumeTokenRoundTripsThroughHexEncoding) { + Timestamp ts(1001, 3); + + ResumeTokenData resumeTokenDataIn; + resumeTokenDataIn.clusterTime = ts; + resumeTokenDataIn.uuid = UUID::gen(); + resumeTokenDataIn.eventIdentifier = Value(BSON("operationType" + << "create" + << "operationDescription" << BSONObj())); + + // Test serialization/parsing through Document. + auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); + ResumeTokenData tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); + + // Test serialization/parsing through BSON. + rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); + tokenData = rtToken.getData(); + ASSERT_EQ(resumeTokenDataIn, tokenData); +} + TEST(ResumeToken, TestMissingTypebitsOptimization) { Timestamp ts(1000, 1); UUID testUuid = UUID::gen(); @@ -118,8 +141,8 @@ TEST(ResumeToken, TestMissingTypebitsOptimization) { auto rtNoTypeBitsData = ResumeToken::parse(noTypeBitsDoc).getData(); ASSERT_EQ(hasTypeBitsData, rtHasTypeBitsData); ASSERT_EQ(noTypeBitsData, rtNoTypeBitsData); - ASSERT_EQ(BSONType::NumberDouble, rtHasTypeBitsData.documentKey["_id"].getType()); - ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.documentKey["_id"].getType()); + ASSERT_EQ(BSONType::NumberDouble, rtHasTypeBitsData.eventIdentifier["_id"].getType()); + ASSERT_EQ(BSONType::NumberInt, rtNoTypeBitsData.eventIdentifier["_id"].getType()); } TEST(ResumeToken, FailsToParseForInvalidTokenFormats) { @@ -277,7 +300,7 @@ TEST(ResumeToken, InvalidTxnOpIndex) { TEST(ResumeToken, StringEncodingSortsCorrectly) { // Make sure that the string encoding of the resume tokens will compare in the correct order, - // namely timestamp, version, txnOpIndex, uuid, then documentKey. + // namely timestamp, version, txnOpIndex, uuid, then eventIdentifier. Timestamp ts2_2(2, 2); Timestamp ts10_4(10, 4); Timestamp ts10_5(10, 5); @@ -309,7 +332,8 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) { assertLt({ts2_2, 0, 0, boost::none, Value()}, {ts2_2, 1, 0, boost::none, Value()}); assertLt({ts10_4, 5, 0, boost::none, Value()}, {ts10_4, 10, 0, boost::none, Value()}); - // Test that the Timestamp is more important than the version, txnOpIndex, UUID and documentKey. + // Test that the Timestamp is more important than the version, txnOpIndex, UUID and + // eventIdentifier. assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, {ts10_5, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}); assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, @@ -347,7 +371,7 @@ TEST(ResumeToken, StringEncodingSortsCorrectly) { assertLt({ts10_4, 0, 0, lower_uuid, Value(Document{{"_id", 1}})}, {ts10_4, 0, 0, higher_uuid, Value(Document{{"_id", 2}})}); - // Test that when the Timestamp, version, txnOpIndex, and UUID are the same, the documentKey + // Test that when the Timestamp, version, txnOpIndex, and UUID are the same, the eventIdentifier // breaks the tie. assertLt({ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 0}})}, {ts2_2, 0, 0, lower_uuid, Value(Document{{"_id", 1}})}); |