summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-04-12 18:06:31 -0400
committerIan Boros <ian.boros@10gen.com>2018-04-13 16:29:01 -0400
commit0c2990eb8780ee3d46b6d73ad3a43013943fc2c4 (patch)
tree4b131325b0e50adf23f409dbb936409d1854c72e
parent35c2a58bda1b90fbc25a3204290ca23c1dc7e00b (diff)
downloadmongo-0c2990eb8780ee3d46b6d73ad3a43013943fc2c4.tar.gz
SERVER-33114 part 2: support unwinding applyOps for cluster-wide change stream
-rw-r--r--jstests/change_streams/change_stream_apply_ops.js73
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp70
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp13
4 files changed, 106 insertions, 66 deletions
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js
index eda99611583..85ea4b5ed8c 100644
--- a/jstests/change_streams/change_stream_apply_ops.js
+++ b/jstests/change_streams/change_stream_apply_ops.js
@@ -7,8 +7,17 @@
load("jstests/libs/change_stream_util.js");
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- function testChangeStreamsWithTransactions(watchWholeDb) {
- let cst = new ChangeStreamTest(db);
+ var WatchMode = {
+ kCollection: 1,
+ kDb: 2,
+ kCluster: 3,
+ };
+
+ function testChangeStreamsWithTransactions(watchMode) {
+ let dbToStartTestOn = db;
+ if (watchMode == WatchMode.kCluster) {
+ dbToStartTestOn = db.getSiblingDB("admin");
+ }
const otherCollName = "change_stream_apply_ops_2";
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
@@ -22,12 +31,16 @@
const kDeletedDocumentId = 0;
coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"});
- let collArg = coll;
- if (watchWholeDb) {
- collArg = 1;
+ let cst = new ChangeStreamTest(dbToStartTestOn);
+
+ let changeStream = null;
+ if (watchMode == WatchMode.kCluster) {
+ changeStream = cst.startWatchingAllChangesForCluster();
+ } else {
+ const collArg = (watchMode == WatchMode.kCollection ? coll : 1);
+ changeStream =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collArg});
}
- let changeStream =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collArg});
const sessionOptions = {causalConsistency: false};
const session = db.getMongo().startSession(sessionOptions);
@@ -38,14 +51,14 @@
assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
- // One insert on a collection that we're not watching. This should be skipped in the change
- // stream.
+ // One insert on a collection that we're not watching. This should be skipped by the
+ // single-collection changestream.
assert.commandWorked(
- sessionDb[otherCollName].insert({_id: 0, a: "Doc on other collection"}));
+ sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
- // One insert on a database we're not watching. Should also be skipped.
+ // This should be skipped by the single-collection and single-db changestreams.
assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert(
- {_id: 0, a: "SHOULD NOT READ THIS"}));
+ {_id: 222, a: "Doc on other DB"}));
assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
@@ -63,9 +76,6 @@
]
}));
- // Drop the collection. This will trigger an "invalidate" event.
- assert.commandWorked(db.runCommand({drop: coll.getName()}));
-
// Check for the first insert.
let change = cst.getOneChange(changeStream);
assert.eq(change.fullDocument._id, 1);
@@ -73,6 +83,8 @@
const firstChangeTxnNumber = change.txnNumber;
const firstChangeLsid = change.lsid;
assert.eq(typeof firstChangeLsid, "object");
+ assert.eq(change.ns.coll, coll.getName());
+ assert.eq(change.ns.db, db.getName());
// Check for the second insert.
change = cst.getOneChange(changeStream);
@@ -80,11 +92,13 @@
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
+ assert.eq(change.ns.coll, coll.getName());
+ assert.eq(change.ns.db, db.getName());
- if (watchWholeDb) {
+ if (watchMode >= WatchMode.kDb) {
// We should see the insert on the other collection.
change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 0);
+ assert.eq(change.fullDocument._id, 111);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
@@ -92,12 +106,25 @@
assert.eq(change.ns.db, db.getName());
}
+ if (watchMode >= WatchMode.kCluster) {
+ // We should see the insert on the other db.
+ change = cst.getOneChange(changeStream);
+ assert.eq(change.fullDocument._id, 222);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
+ assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
+ assert.eq(change.ns.coll, otherDbCollName);
+ assert.eq(change.ns.db, otherDbName);
+ }
+
// Check for the update.
change = cst.getOneChange(changeStream);
- assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1}));
assert.eq(change.operationType, "update", tojson(change));
+ assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1}));
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
+ assert.eq(change.ns.coll, coll.getName());
+ assert.eq(change.ns.db, db.getName());
// Check for the delete.
change = cst.getOneChange(changeStream);
@@ -105,6 +132,11 @@
assert.eq(change.operationType, "delete", tojson(change));
assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
+ assert.eq(change.ns.coll, coll.getName());
+ assert.eq(change.ns.db, db.getName());
+
+ // Drop the collection. This will trigger an "invalidate" event.
+ assert.commandWorked(db.runCommand({drop: coll.getName()}));
// The drop should have invalidated the change stream.
cst.assertNextChangesEqual({
@@ -118,6 +150,7 @@
// TODO: SERVER-34302 should allow us to simplify this test, so we're not required to
// explicitly run both against a single collection and against the whole DB.
- testChangeStreamsWithTransactions(false);
- testChangeStreamsWithTransactions(true);
+ testChangeStreamsWithTransactions(WatchMode.kCollection);
+ testChangeStreamsWithTransactions(WatchMode.kDb);
+ testChangeStreamsWithTransactions(WatchMode.kCluster);
}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 40e12524f9e..f9d9b50e387 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -86,6 +86,10 @@ constexpr StringData DocumentSourceChangeStream::kInsertOpType;
constexpr StringData DocumentSourceChangeStream::kInvalidateOpType;
constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
+constexpr StringData DocumentSourceChangeStream::kRegexAllCollections;
+constexpr StringData DocumentSourceChangeStream::kRegexAllDBs;
+constexpr StringData DocumentSourceChangeStream::kRegexCmdColl;
+
namespace {
static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
@@ -262,21 +266,43 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) {
}
} // namespace
+DocumentSourceChangeStream::ChangeStreamType DocumentSourceChangeStream::getChangeStreamType(
+ const NamespaceString& nss) {
+
+ // If we have been permitted to run on admin, 'allChangesForCluster' must be true.
+ return (nss.isAdminDB()
+ ? ChangeStreamType::kAllChangesForCluster
+ : (nss.isCollectionlessAggregateNS() ? ChangeStreamType::kSingleDatabase
+ : ChangeStreamType::kSingleCollection));
+}
+
+std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const NamespaceString& nss) {
+ auto type = getChangeStreamType(nss);
+ switch (type) {
+ case ChangeStreamType::kSingleCollection:
+ // Match the target namespace exactly.
+ return "^" + nss.ns() + "$";
+ case ChangeStreamType::kSingleDatabase:
+ // Match all namespaces that start with db name, followed by ".", then NOT followed by
+ // '$' or 'system.'
+ return "^" + nss.db() + kRegexAllCollections;
+ case ChangeStreamType::kAllChangesForCluster:
+ // Match all namespaces that start with any db name other than admin, config, or local,
+ // followed by ".", then NOT followed by '$' or 'system.'
+ return "^" + kRegexAllDBs + kRegexAllCollections;
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+
BSONObj DocumentSourceChangeStream::buildMatchFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp startFrom,
bool startFromInclusive) {
auto nss = expCtx->ns;
- // If we have been permitted to run on admin, 'allChangesForCluster' must be true.
- ChangeStreamType sourceType = (nss.isAdminDB() ? ChangeStreamType::kAllChangesForCluster
- : (nss.isCollectionlessAggregateNS()
- ? ChangeStreamType::kSingleDatabase
- : ChangeStreamType::kSingleCollection));
- // Regular expressions that match all oplog entries on supported databases and collections.
- const auto regexAllCollections = R"(\.(?!(\$|system\.)))"_sd;
- const auto regexAllDBs = "(?!(admin|config|local)).+"_sd;
- const auto regexCmdColl = R"(\.\$cmd$)"_sd;
+ ChangeStreamType sourceType = getChangeStreamType(nss);
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
BSONArrayBuilder invalidatingCommands;
@@ -305,7 +331,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// For cluster-wide $changeStream, match the command namespace of any database other than admin,
// config, or local. Otherwise, match only against the target db's command namespace.
auto cmdNsFilter = (sourceType == ChangeStreamType::kAllChangesForCluster
- ? BSON("ns" << BSONRegEx("^" + regexAllDBs + regexCmdColl))
+ ? BSON("ns" << BSONRegEx("^" + kRegexAllDBs + kRegexCmdColl))
: BSON("ns" << nss.getCommandNS().ns()));
// 1.1) Commands that are on target db(s) and one of the above invalidating commands.
@@ -332,22 +358,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
<< "migrateChunkToNewShard");
// 2) Supported operations on the target namespace.
- BSONObj nsMatch;
- switch (sourceType) {
- case ChangeStreamType::kSingleCollection:
- // Match the target namespace exactly.
- nsMatch = BSON("ns" << nss.ns());
- break;
- case ChangeStreamType::kSingleDatabase:
- // Match all namespaces that start with db name, followed by ".", then NOT followed by
- // '$' or 'system.'
- nsMatch = BSON("ns" << BSONRegEx("^" + nss.db() + regexAllCollections));
- break;
- case ChangeStreamType::kAllChangesForCluster:
- // Match all namespaces that start with any db name other than admin, config, or local,
- // followed by ".", then NOT followed by '$' or 'system.'
- nsMatch = BSON("ns" << BSONRegEx("^" + regexAllDBs + regexAllCollections));
- }
+ BSONObj nsMatch = BSON("ns" << BSONRegEx(getNsRegexForChangeStream(nss)));
auto opMatch = BSON(nsMatch["ns"] << OR(normalOpTypeMatch, chunkMigratedMatch));
// 3) Look for 'applyOps' which were created as part of a transaction.
@@ -449,13 +460,6 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::string DocumentSourceChangeStream::buildAllCollectionsRegex(const NamespaceString& nss) {
- // Match all namespaces that start with db name, followed by ".", then not followed by
- // '$' or 'system.'
- static const auto regexAllCollections = R"(\.(?!(\$|system\.)))";
- return "^" + nss.db() + regexAllCollections;
-}
-
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
// A change stream is a tailable + awaitData cursor.
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index ca4b579b165..0912e79ebd7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -144,6 +144,16 @@ public:
// Internal op type to signal mongos to open cursors on new shards.
static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
+ enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster };
+
+
+ /**
+ * Helpers for Determining which regex to match a change stream against.
+ */
+ static ChangeStreamType getChangeStreamType(const NamespaceString& nss);
+ static std::string getNsRegexForChangeStream(const NamespaceString& nss);
+
+
/**
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
* to only those relevant for this $changeStream stage.
@@ -152,8 +162,6 @@ public:
Timestamp startFrom,
bool startFromInclusive);
- static std::string buildAllCollectionsRegex(const NamespaceString& nss);
-
/**
* Parses a $changeStream stage from 'elem' and produces the $match and transformation
* stages required.
@@ -182,7 +190,9 @@ public:
static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType);
private:
- enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster };
+ static constexpr StringData kRegexAllCollections = R"(\.(?!(\$|system\.)))"_sd;
+ static constexpr StringData kRegexAllDBs = "(?!(admin|config|local)).+"_sd;
+ static constexpr StringData kRegexCmdColl = R"(\.\$cmd$)"_sd;
// Helper function which throws if the $changeStream fails any of a series of semantic checks.
// For instance, whether it is permitted to run given the current FCV, whether the namespace is
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 50620f898bd..922b01d00c2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -96,9 +96,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
: ResumeToken::SerializationFormat::kBinData),
_isIndependentOfAnyCollection(isIndependentOfAnyCollection) {
- if (expCtx->ns.isCollectionlessAggregateNS()) {
- _nsRegex.emplace(DocumentSourceChangeStream::buildAllCollectionsRegex(expCtx->ns));
- }
+ _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns));
}
DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints(
@@ -308,6 +306,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// If we're in a sharded environment, we'll need to merge the results by their sort key, so add
// that as metadata.
if (pExpCtx->needsMerge) {
+ // TODO SERVER-34314: Sort key may have to be _id.data in FCV 4.0.
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
@@ -402,13 +401,7 @@ bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d)
Value nsField = d["ns"];
invariant(!nsField.missing());
- if (_nsRegex) {
- // Match all namespaces that start with db name, followed by ".", then not followed by
- // '$' or 'system.'
- return _nsRegex->PartialMatch(nsField.getString());
- }
-
- return nsField.getString() == pExpCtx->ns.ns();
+ return _nsRegex->PartialMatch(nsField.getString());
}
boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() {