diff options
4 files changed, 106 insertions, 66 deletions
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js index eda99611583..85ea4b5ed8c 100644 --- a/jstests/change_streams/change_stream_apply_ops.js +++ b/jstests/change_streams/change_stream_apply_ops.js @@ -7,8 +7,17 @@ load("jstests/libs/change_stream_util.js"); load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - function testChangeStreamsWithTransactions(watchWholeDb) { - let cst = new ChangeStreamTest(db); + var WatchMode = { + kCollection: 1, + kDb: 2, + kCluster: 3, + }; + + function testChangeStreamsWithTransactions(watchMode) { + let dbToStartTestOn = db; + if (watchMode == WatchMode.kCluster) { + dbToStartTestOn = db.getSiblingDB("admin"); + } const otherCollName = "change_stream_apply_ops_2"; const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); @@ -22,12 +31,16 @@ const kDeletedDocumentId = 0; coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"}); - let collArg = coll; - if (watchWholeDb) { - collArg = 1; + let cst = new ChangeStreamTest(dbToStartTestOn); + + let changeStream = null; + if (watchMode == WatchMode.kCluster) { + changeStream = cst.startWatchingAllChangesForCluster(); + } else { + const collArg = (watchMode == WatchMode.kCollection ? coll : 1); + changeStream = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collArg}); } - let changeStream = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collArg}); const sessionOptions = {causalConsistency: false}; const session = db.getMongo().startSession(sessionOptions); @@ -38,14 +51,14 @@ assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); - // One insert on a collection that we're not watching. This should be skipped in the change - // stream. + // One insert on a collection that we're not watching. This should be skipped by the + // single-collection changestream. assert.commandWorked( - sessionDb[otherCollName].insert({_id: 0, a: "Doc on other collection"})); + sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); - // One insert on a database we're not watching. Should also be skipped. + // This should be skipped by the single-collection and single-db changestreams. assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert( - {_id: 0, a: "SHOULD NOT READ THIS"})); + {_id: 222, a: "Doc on other DB"})); assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); @@ -63,9 +76,6 @@ ] })); - // Drop the collection. This will trigger an "invalidate" event. - assert.commandWorked(db.runCommand({drop: coll.getName()})); - // Check for the first insert. let change = cst.getOneChange(changeStream); assert.eq(change.fullDocument._id, 1); @@ -73,6 +83,8 @@ const firstChangeTxnNumber = change.txnNumber; const firstChangeLsid = change.lsid; assert.eq(typeof firstChangeLsid, "object"); + assert.eq(change.ns.coll, coll.getName()); + assert.eq(change.ns.db, db.getName()); // Check for the second insert. change = cst.getOneChange(changeStream); @@ -80,11 +92,13 @@ assert.eq(change.operationType, "insert", tojson(change)); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); + assert.eq(change.ns.coll, coll.getName()); + assert.eq(change.ns.db, db.getName()); - if (watchWholeDb) { + if (watchMode >= WatchMode.kDb) { // We should see the insert on the other collection. change = cst.getOneChange(changeStream); - assert.eq(change.fullDocument._id, 0); + assert.eq(change.fullDocument._id, 111); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); @@ -92,12 +106,25 @@ assert.eq(change.ns.db, db.getName()); } + if (watchMode >= WatchMode.kCluster) { + // We should see the insert on the other db. + change = cst.getOneChange(changeStream); + assert.eq(change.fullDocument._id, 222); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); + assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); + assert.eq(change.ns.coll, otherDbCollName); + assert.eq(change.ns.db, otherDbName); + } + // Check for the update. change = cst.getOneChange(changeStream); - assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1})); assert.eq(change.operationType, "update", tojson(change)); + assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1})); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); + assert.eq(change.ns.coll, coll.getName()); + assert.eq(change.ns.db, db.getName()); // Check for the delete. change = cst.getOneChange(changeStream); @@ -105,6 +132,11 @@ assert.eq(change.operationType, "delete", tojson(change)); assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber); assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid)); + assert.eq(change.ns.coll, coll.getName()); + assert.eq(change.ns.db, db.getName()); + + // Drop the collection. This will trigger an "invalidate" event. + assert.commandWorked(db.runCommand({drop: coll.getName()})); // The drop should have invalidated the change stream. cst.assertNextChangesEqual({ @@ -118,6 +150,7 @@ // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to // explicitly run both against a single collection and against the whole DB. - testChangeStreamsWithTransactions(false); - testChangeStreamsWithTransactions(true); + testChangeStreamsWithTransactions(WatchMode.kCollection); + testChangeStreamsWithTransactions(WatchMode.kDb); + testChangeStreamsWithTransactions(WatchMode.kCluster); }()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 40e12524f9e..f9d9b50e387 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -86,6 +86,10 @@ constexpr StringData DocumentSourceChangeStream::kInsertOpType; constexpr StringData DocumentSourceChangeStream::kInvalidateOpType; constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType; +constexpr StringData DocumentSourceChangeStream::kRegexAllCollections; +constexpr StringData DocumentSourceChangeStream::kRegexAllDBs; +constexpr StringData DocumentSourceChangeStream::kRegexCmdColl; + namespace { static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; @@ -262,21 +266,43 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) { } } // namespace +DocumentSourceChangeStream::ChangeStreamType DocumentSourceChangeStream::getChangeStreamType( + const NamespaceString& nss) { + + // If we have been permitted to run on admin, 'allChangesForCluster' must be true. + return (nss.isAdminDB() + ? ChangeStreamType::kAllChangesForCluster + : (nss.isCollectionlessAggregateNS() ? ChangeStreamType::kSingleDatabase + : ChangeStreamType::kSingleCollection)); +} + +std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const NamespaceString& nss) { + auto type = getChangeStreamType(nss); + switch (type) { + case ChangeStreamType::kSingleCollection: + // Match the target namespace exactly. + return "^" + nss.ns() + "$"; + case ChangeStreamType::kSingleDatabase: + // Match all namespaces that start with db name, followed by ".", then NOT followed by + // '$' or 'system.' + return "^" + nss.db() + kRegexAllCollections; + case ChangeStreamType::kAllChangesForCluster: + // Match all namespaces that start with any db name other than admin, config, or local, + // followed by ".", then NOT followed by '$' or 'system.' + return "^" + kRegexAllDBs + kRegexAllCollections; + default: + MONGO_UNREACHABLE; + } +} + + BSONObj DocumentSourceChangeStream::buildMatchFilter( const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, bool startFromInclusive) { auto nss = expCtx->ns; - // If we have been permitted to run on admin, 'allChangesForCluster' must be true. - ChangeStreamType sourceType = (nss.isAdminDB() ? ChangeStreamType::kAllChangesForCluster - : (nss.isCollectionlessAggregateNS() - ? ChangeStreamType::kSingleDatabase - : ChangeStreamType::kSingleCollection)); - // Regular expressions that match all oplog entries on supported databases and collections. - const auto regexAllCollections = R"(\.(?!(\$|system\.)))"_sd; - const auto regexAllDBs = "(?!(admin|config|local)).+"_sd; - const auto regexCmdColl = R"(\.\$cmd$)"_sd; + ChangeStreamType sourceType = getChangeStreamType(nss); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. BSONArrayBuilder invalidatingCommands; @@ -305,7 +331,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( // For cluster-wide $changeStream, match the command namespace of any database other than admin, // config, or local. Otherwise, match only against the target db's command namespace. auto cmdNsFilter = (sourceType == ChangeStreamType::kAllChangesForCluster - ? BSON("ns" << BSONRegEx("^" + regexAllDBs + regexCmdColl)) + ? BSON("ns" << BSONRegEx("^" + kRegexAllDBs + kRegexCmdColl)) : BSON("ns" << nss.getCommandNS().ns())); // 1.1) Commands that are on target db(s) and one of the above invalidating commands. @@ -332,22 +358,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( << "migrateChunkToNewShard"); // 2) Supported operations on the target namespace. - BSONObj nsMatch; - switch (sourceType) { - case ChangeStreamType::kSingleCollection: - // Match the target namespace exactly. - nsMatch = BSON("ns" << nss.ns()); - break; - case ChangeStreamType::kSingleDatabase: - // Match all namespaces that start with db name, followed by ".", then NOT followed by - // '$' or 'system.' - nsMatch = BSON("ns" << BSONRegEx("^" + nss.db() + regexAllCollections)); - break; - case ChangeStreamType::kAllChangesForCluster: - // Match all namespaces that start with any db name other than admin, config, or local, - // followed by ".", then NOT followed by '$' or 'system.' - nsMatch = BSON("ns" << BSONRegEx("^" + regexAllDBs + regexAllCollections)); - } + BSONObj nsMatch = BSON("ns" << BSONRegEx(getNsRegexForChangeStream(nss))); auto opMatch = BSON(nsMatch["ns"] << OR(normalOpTypeMatch, chunkMigratedMatch)); // 3) Look for 'applyOps' which were created as part of a transaction. @@ -449,13 +460,6 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::string DocumentSourceChangeStream::buildAllCollectionsRegex(const NamespaceString& nss) { - // Match all namespaces that start with db name, followed by ".", then not followed by - // '$' or 'system.' - static const auto regexAllCollections = R"(\.(?!(\$|system\.)))"; - return "^" + nss.db() + regexAllCollections; -} - list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { // A change stream is a tailable + awaitData cursor. diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index ca4b579b165..0912e79ebd7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -144,6 +144,16 @@ public: // Internal op type to signal mongos to open cursors on new shards. static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd; + enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster }; + + + /** + * Helpers for Determining which regex to match a change stream against. + */ + static ChangeStreamType getChangeStreamType(const NamespaceString& nss); + static std::string getNsRegexForChangeStream(const NamespaceString& nss); + + /** * Produce the BSON object representing the filter for the $match stage to filter oplog entries * to only those relevant for this $changeStream stage. @@ -152,8 +162,6 @@ public: Timestamp startFrom, bool startFromInclusive); - static std::string buildAllCollectionsRegex(const NamespaceString& nss); - /** * Parses a $changeStream stage from 'elem' and produces the $match and transformation * stages required. @@ -182,7 +190,9 @@ public: static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType); private: - enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster }; + static constexpr StringData kRegexAllCollections = R"(\.(?!(\$|system\.)))"_sd; + static constexpr StringData kRegexAllDBs = "(?!(admin|config|local)).+"_sd; + static constexpr StringData kRegexCmdColl = R"(\.\$cmd$)"_sd; // Helper function which throws if the $changeStream fails any of a series of semantic checks. // For instance, whether it is permitted to run given the current FCV, whether the namespace is diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 50620f898bd..922b01d00c2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -96,9 +96,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( : ResumeToken::SerializationFormat::kBinData), _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { - if (expCtx->ns.isCollectionlessAggregateNS()) { - _nsRegex.emplace(DocumentSourceChangeStream::buildAllCollectionsRegex(expCtx->ns)); - } + _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)); } DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints( @@ -308,6 +306,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // If we're in a sharded environment, we'll need to merge the results by their sort key, so add // that as metadata. if (pExpCtx->needsMerge) { + // TODO SERVER-34314: Sort key may have to be _id.data in FCV 4.0. doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); } @@ -402,13 +401,7 @@ bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) Value nsField = d["ns"]; invariant(!nsField.missing()); - if (_nsRegex) { - // Match all namespaces that start with db name, followed by ".", then not followed by - // '$' or 'system.' - return _nsRegex->PartialMatch(nsField.getString()); - } - - return nsField.getString() == pExpCtx->ns.ns(); + return _nsRegex->PartialMatch(nsField.getString()); } boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() { |