summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrii Dobroshynski <andrii.dobroshynski@mongodb.com>2022-02-17 08:49:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-15 07:19:11 +0000
commit2572165d03ab73a8969dd60f5c0fbda20ad41aef (patch)
treea3bd0608729e11c166ffc1ce2cfa15d1de51531c
parentb7b6ca761e7939ddf75cdf96fa3f37ce5db73b7a (diff)
downloadmongo-2572165d03ab73a8969dd60f5c0fbda20ad41aef.tar.gz
SERVER-61893 Add capability for showing system events with showSystemEvents flag
-rw-r--r--jstests/change_streams/show_system_events.js258
-rw-r--r--jstests/libs/change_stream_util.js1
-rw-r--r--src/mongo/db/pipeline/aggregation_context_fixture.h1
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp14
-rw-r--r--src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp53
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h22
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl6
-rw-r--r--src/mongo/db/pipeline/expression_context.h4
-rw-r--r--src/mongo/shell/mongo.js10
10 files changed, 348 insertions, 23 deletions
diff --git a/jstests/change_streams/show_system_events.js b/jstests/change_streams/show_system_events.js
new file mode 100644
index 00000000000..ba7633fdcc0
--- /dev/null
+++ b/jstests/change_streams/show_system_events.js
@@ -0,0 +1,258 @@
+/**
+ * Tests the behavior of change streams in the presence of 'showSystemEvents' flag.
+ *
+ * @tags: [
+ * requires_fcv_60,
+ * # This test assumes certain ordering of events.
+ * assumes_unsharded_collection,
+ * # Assumes to implicit index creation.
+ * assumes_no_implicit_index_creation
+ * ]
+ */
+(function() {
+"use strict";
+
+load('jstests/libs/change_stream_util.js'); // For 'ChangeStreamTest' and
+ // 'assertChangeStreamEventEq'.
+load('jstests/libs/collection_drop_recreate.js'); // For 'assertDropCollection'.
+
+const testDB = db.getSiblingDB(jsTestName());
+
+if (!isChangeStreamsVisibilityEnabled(testDB)) {
+ assert.commandFailedWithCode(testDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {showSystemEvents: true}}],
+ cursor: {},
+ }),
+ 6189301);
+ return;
+}
+
+// Assert that the flag is not allowed with 'apiStrict'.
+assert.commandFailedWithCode(testDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {showSystemEvents: true}}],
+ cursor: {},
+ apiVersion: "1",
+ apiStrict: true
+}),
+ ErrorCodes.APIStrictError);
+
+const test = new ChangeStreamTest(testDB);
+
+const systemNS = {
+ db: testDB.getName(),
+ coll: 'system.js'
+};
+const collRenamed = 'collRenamed';
+
+function runWholeDbChangeStreamTestWithoutSystemEvents(test, cursor, nonSystemColl) {
+ assertDropCollection(testDB, nonSystemColl.getName());
+
+ let expected = {
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "drop",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Write to the 'normal' collection.
+ assert.commandWorked(nonSystemColl.insert({_id: 1, a: 1}));
+
+ // Insert a document into the system.js collection.
+ assert.commandWorked(testDB.system.js.insert({_id: 3, c: 1}));
+
+ // The next event should still be only the insert into the 'regular' collection, even though
+ // we've inserted into the system collection.
+ let expectedChanges = [
+ {
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "create",
+ },
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 1},
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "insert",
+ }
+ ];
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+
+ // Update into a system collection.
+ assert.commandWorked(testDB.system.js.update({_id: 3}, {c: 2}));
+ // Delete from a system collection.
+ assert.commandWorked(testDB.system.js.remove({_id: 3}));
+
+ // Rename the system collection.
+ assert.commandWorked(testDB.system.js.renameCollection(collRenamed));
+ // Rename back to system collection.
+ assert.commandWorked(testDB[collRenamed].renameCollection(systemNS.coll));
+
+ // We should see both renames because they involve a normal namespace. However, we don't see any
+ // of the preceding CRUD operations on the system collection.
+ expectedChanges = [
+ {
+ ns: {db: testDB.getName(), coll: systemNS.coll},
+ to: {db: testDB.getName(), coll: collRenamed},
+ operationType: "rename",
+ },
+ {
+ ns: {db: testDB.getName(), coll: collRenamed},
+ to: {db: testDB.getName(), coll: systemNS.coll},
+ operationType: "rename",
+ }
+ ];
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+
+ // Once again write to the 'normal' collection.
+ assert.commandWorked(nonSystemColl.insert({_id: 2, a: 1}));
+
+ // Similar as to before, the next event should be the insert on the 'regular' collection even
+ // though we have performed a number of operations on the system collection.
+ expected = {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, a: 1},
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "insert",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+}
+
+function runWholeDbChangeStreamTestWithSystemEvents(test, cursor, nonSystemColl) {
+ assertDropCollection(testDB, nonSystemColl.getName());
+
+ let expected = {
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "drop",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Write to the 'normal' collection.
+ assert.commandWorked(nonSystemColl.insert({_id: 1, a: 1}));
+
+ let expectedChanges = [
+ {
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "create",
+ },
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 1},
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "insert",
+ }
+ ];
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+
+ // Insert into a system collection.
+ assert.commandWorked(testDB.system.js.insert({_id: 2, b: 1}));
+
+ expected = {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, b: 1},
+ ns: {db: testDB.getName(), coll: systemNS.coll},
+ operationType: "insert",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Update into a system collection.
+ assert.commandWorked(testDB.system.js.update({_id: 2}, {b: 2}));
+
+ expected = {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, b: 2},
+ ns: {db: testDB.getName(), coll: systemNS.coll},
+ operationType: "replace",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Delete from a system collection.
+ assert.commandWorked(testDB.system.js.remove({_id: 2}));
+
+ expected = {
+ documentKey: {_id: 2},
+ ns: {db: testDB.getName(), coll: systemNS.coll},
+ operationType: "delete",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Rename the system collection.
+ assert.commandWorked(testDB.system.js.renameCollection(collRenamed));
+ // Rename back to system collection.
+ assert.commandWorked(testDB[collRenamed].renameCollection(systemNS.coll));
+
+ expectedChanges = [
+ {
+ ns: {db: testDB.getName(), coll: systemNS.coll},
+ to: {db: testDB.getName(), coll: collRenamed},
+ operationType: "rename",
+ },
+ {
+ ns: {db: testDB.getName(), coll: collRenamed},
+ to: {db: testDB.getName(), coll: systemNS.coll},
+ operationType: "rename",
+ }
+ ];
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+}
+
+function runSingleCollectionChangeStreamTest(test, cursor, nonSystemColl) {
+ // Write to the 'normal' collection.
+ assert.commandWorked(nonSystemColl.insert({_id: 1, a: 1}));
+
+ let expected = {
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "create",
+ };
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Insert into a system collection.
+ assert.commandWorked(testDB.system.js.insert({_id: 1, a: 1}));
+ // Update into a system collection.
+ assert.commandWorked(testDB.system.js.update({_id: 1}, {a: 2}));
+ // Delete from a system collection.
+ assert.commandWorked(testDB.system.js.remove({_id: 1}));
+
+ // Rename the system collection.
+ assert.commandWorked(testDB.system.js.renameCollection(collRenamed));
+ // Rename back to system collection.
+ assert.commandWorked(testDB[collRenamed].renameCollection(systemNS.coll));
+
+ // Write again to the 'normal' collection as a sentinel write.
+ assert.commandWorked(nonSystemColl.insert({_id: 2, a: 2}));
+
+ // The only expected events should be the two inserts into the non-system collection.
+ const expectedChanges = [
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 1},
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, a: 2},
+ ns: {db: testDB.getName(), coll: nonSystemColl.getName()},
+ operationType: "insert",
+ }
+ ];
+ test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
+}
+
+const regularColl = testDB.test_coll;
+regularColl.drop();
+
+// Run a single-collection stream on a normal collection with 'showSystemEvents' set to 'true'.
+let pipeline = [{$changeStream: {showExpandedEvents: true, showSystemEvents: true}}];
+let cursor = test.startWatchingChanges({pipeline: pipeline, collection: regularColl});
+runSingleCollectionChangeStreamTest(test, cursor, regularColl);
+
+// Run a whole-DB stream with 'showSystemEvents' set to 'true'.
+pipeline = [{$changeStream: {showExpandedEvents: true, showSystemEvents: true}}];
+cursor = test.startWatchingChanges({pipeline: pipeline, collection: 1});
+runWholeDbChangeStreamTestWithSystemEvents(test, cursor, regularColl);
+
+// Now run a whole-DB stream with 'showSystemEvents' set to 'false'.
+pipeline = [{$changeStream: {showExpandedEvents: true, showSystemEvents: false}}];
+cursor = test.startWatchingChanges({pipeline: pipeline, collection: 1});
+runWholeDbChangeStreamTestWithoutSystemEvents(test, cursor, regularColl);
+}()); \ No newline at end of file
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index a6697bf5b03..956e4e83bc2 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -192,6 +192,7 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
* 'aggregateOptions' if provided and saves the cursor so that it can be cleaned up later.
*/
self.startWatchingAllChangesForCluster = function(aggregateOptions) {
+ assert.eq(_db.getName(), "admin");
return self.startWatchingChanges({
pipeline: [{$changeStream: {allChangesForCluster: true}}],
collection: 1,
diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h
index f6060260455..a4dfcfeb769 100644
--- a/src/mongo/db/pipeline/aggregation_context_fixture.h
+++ b/src/mongo/db/pipeline/aggregation_context_fixture.h
@@ -56,6 +56,7 @@ public:
_expCtx = make_intrusive<ExpressionContextForTest>(_opCtx.get(), nss);
unittest::TempDir tempDir("AggregationContextFixture");
_expCtx->tempDir = tempDir.path();
+ _expCtx->changeStreamSpec = DocumentSourceChangeStreamSpec();
}
auto getExpCtx() {
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index 73e4eb3b26f..e0016fa00ad 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -56,9 +56,9 @@ std::unique_ptr<MatchExpression> buildOperationFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const MatchExpression* userMatch) {
// Regexes to match each of the necessary namespace components for the current stream type.
- auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns);
- auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx->ns);
- auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx->ns);
+ auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx);
+ auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx);
+ auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx);
auto streamType = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns);
@@ -192,9 +192,9 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
BSONArrayBuilder orBuilder(applyOpsBuilder.subarrayStart("$or"));
{
// Regexes for full-namespace, collection, and command-namespace matching.
- auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns);
- auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx->ns);
- auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx->ns);
+ auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx);
+ auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx);
+ auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx);
// Match relevant CRUD events on the monitored namespaces.
orBuilder.append(BSON("o.applyOps.ns" << BSONRegEx(nsRegex)));
@@ -253,7 +253,7 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
}
// Also filter for shardCollection events, which are recorded as {op: 'n'} in the oplog.
- auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns);
+ auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx);
internalOpTypeOrBuilder.append(BSON("o2.shardCollection" << BSONRegEx(nsRegex)));
// Finalize the array of $or filter predicates.
diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
index e5324b9cd88..071ec300587 100644
--- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp
@@ -820,7 +820,7 @@ std::unique_ptr<MatchExpression> matchRewriteGenericNamespace(
if (fieldName == "db") {
return "^" +
DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) +
- "\\." + DocumentSourceChangeStream::kRegexAllCollections;
+ "\\." + DocumentSourceChangeStream::resolveAllCollectionsRegex(expCtx);
}
return DocumentSourceChangeStream::kRegexAllDBs + "\\." +
DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) +
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a5f8f7eb3a0..37d818ca7a4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -105,6 +105,8 @@ constexpr StringData DocumentSourceChangeStream::kReshardDoneCatchUpOpType;
constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
constexpr StringData DocumentSourceChangeStream::kRegexAllCollections;
+constexpr StringData DocumentSourceChangeStream::kRegexAllCollectionsShowSystemEvents;
+
constexpr StringData DocumentSourceChangeStream::kRegexAllDBs;
constexpr StringData DocumentSourceChangeStream::kRegexCmdColl;
@@ -127,28 +129,44 @@ DocumentSourceChangeStream::ChangeStreamType DocumentSourceChangeStream::getChan
: ChangeStreamType::kSingleCollection));
}
-std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const NamespaceString& nss) {
- auto type = getChangeStreamType(nss);
+StringData DocumentSourceChangeStream::resolveAllCollectionsRegex(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ // We never expect this method to be called except when building a change stream pipeline.
+ tassert(6189300,
+ "Expected change stream spec to be set on the expression context",
+ expCtx->changeStreamSpec);
+ // If 'showSystemEvents' is set, return a less stringent regex.
+ return (expCtx->changeStreamSpec->getShowSystemEvents()
+ ? DocumentSourceChangeStream::kRegexAllCollectionsShowSystemEvents
+ : DocumentSourceChangeStream::kRegexAllCollections);
+}
+
+std::string DocumentSourceChangeStream::getNsRegexForChangeStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ const auto type = getChangeStreamType(expCtx->ns);
+ const auto& nss = expCtx->ns;
switch (type) {
case ChangeStreamType::kSingleCollection:
// Match the target namespace exactly.
return "^" + regexEscapeNsForChangeStream(nss.ns()) + "$";
case ChangeStreamType::kSingleDatabase:
// Match all namespaces that start with db name, followed by ".", then NOT followed by
- // '$' or 'system.'
+ // '$' or 'system.' unless 'showSystemEvents' is set.
return "^" + regexEscapeNsForChangeStream(nss.db().toString()) + "\\." +
- kRegexAllCollections;
+ resolveAllCollectionsRegex(expCtx);
case ChangeStreamType::kAllChangesForCluster:
// Match all namespaces that start with any db name other than admin, config, or local,
- // followed by ".", then NOT followed by '$' or 'system.'.
- return kRegexAllDBs + "\\." + kRegexAllCollections;
+ // followed by ".", then NOT '$' or 'system.' unless 'showSystemEvents' is set.
+ return kRegexAllDBs + "\\." + resolveAllCollectionsRegex(expCtx);
default:
MONGO_UNREACHABLE;
}
}
-std::string DocumentSourceChangeStream::getCollRegexForChangeStream(const NamespaceString& nss) {
- auto type = getChangeStreamType(nss);
+std::string DocumentSourceChangeStream::getCollRegexForChangeStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ const auto type = getChangeStreamType(expCtx->ns);
+ const auto& nss = expCtx->ns;
switch (type) {
case ChangeStreamType::kSingleCollection:
// Match the target collection exactly.
@@ -156,14 +174,16 @@ std::string DocumentSourceChangeStream::getCollRegexForChangeStream(const Namesp
case ChangeStreamType::kSingleDatabase:
case ChangeStreamType::kAllChangesForCluster:
// Match any collection; database filtering will be done elsewhere.
- return "^" + kRegexAllCollections;
+ return "^" + resolveAllCollectionsRegex(expCtx);
default:
MONGO_UNREACHABLE;
}
}
-std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(const NamespaceString& nss) {
- auto type = getChangeStreamType(nss);
+std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ const auto type = getChangeStreamType(expCtx->ns);
+ const auto& nss = expCtx->ns;
switch (type) {
case ChangeStreamType::kSingleCollection:
case ChangeStreamType::kSingleDatabase:
@@ -232,6 +252,9 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// Make sure that it is legal to run this $changeStream before proceeding.
DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec);
+ // Save a copy of the spec on the expression context. Used when building the oplog filter.
+ expCtx->changeStreamSpec = spec;
+
// If we see this stage on a shard, it means that the raw $changeStream stage was dispatched to
// us from an old mongoS. Build a legacy shard pipeline.
if (expCtx->needsMerge) {
@@ -376,7 +399,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
uassert(6188501,
"the 'featureFlagChangeStreamsVisibility' should be enabled to use "
- "'showEnhancedEvents:true' in the change stream spec",
+ "'showExpandedEvents:true' in the change stream spec",
feature_flags::gFeatureFlagChangeStreamsVisibility.isEnabledAndIgnoreFCV() ||
!spec.getShowExpandedEvents());
@@ -386,6 +409,12 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
feature_flags::gFeatureFlagChangeStreamsVisibility.isEnabledAndIgnoreFCV() ||
!spec.getShowRawUpdateDescription());
+ uassert(6189301,
+ "the 'featureFlagChangeStreamsVisibility' should be enabled to use "
+ "'showSystemEvents:true' in the change stream spec",
+ feature_flags::gFeatureFlagChangeStreamsVisibility.isEnabledAndIgnoreFCV() ||
+ !spec.getShowSystemEvents());
+
uassert(31123,
"Change streams from mongos may not show migration events",
!(expCtx->inMongos && spec.getShowMigrationEvents()));
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 93831c6b8a5..ae281596679 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -121,6 +121,12 @@ public:
"API Version 1",
_spec.Obj()[DocumentSourceChangeStreamSpec::kShowRawUpdateDescriptionFieldName]
.eoo());
+
+ uassert(
+ ErrorCodes::APIStrictError,
+ "The 'showSystemEvents' parameter to $changeStream is not supported in API "
+ "Version 1",
+ _spec.Obj()[DocumentSourceChangeStreamSpec::kShowSystemEventsFieldName].eoo());
}
}
@@ -229,7 +235,12 @@ public:
static constexpr StringData kDropIndexesOpType = "dropIndexes"_sd;
static constexpr StringData kShardCollectionOpType = "shardCollection"_sd;
+ // Default regex for collections match which prohibits system collections.
static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
+ // Regex matching all regular collections plus certain system collections.
+ static constexpr StringData kRegexAllCollectionsShowSystemEvents =
+ R"((?!(\$|system\.(?!(js$)))))"_sd;
+
static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd;
static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd;
@@ -239,9 +250,14 @@ public:
* Helpers for Determining which regex to match a change stream against.
*/
static ChangeStreamType getChangeStreamType(const NamespaceString& nss);
- static std::string getNsRegexForChangeStream(const NamespaceString& nss);
- static std::string getCollRegexForChangeStream(const NamespaceString& nss);
- static std::string getCmdNsRegexForChangeStream(const NamespaceString& nss);
+ static std::string getNsRegexForChangeStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static std::string getCollRegexForChangeStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static std::string getCmdNsRegexForChangeStream(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static StringData resolveAllCollectionsRegex(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
static std::string regexEscapeNsForChangeStream(StringData source);
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index 69abc61d4a8..e1b8227a0fc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -150,6 +150,12 @@ structs:
of data. Instead they reflect this data moving from one shard to
another.
+ showSystemEvents:
+ cpp_name: showSystemEvents
+ type: optionalBool
+ description: A flag indicating whether the stream should report events on system
+ collections.
+
allowToRunOnConfigDB:
cpp_name: allowToRunOnConfigDB
type: optionalBool
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index c449888f726..9334cd4ecff 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -41,6 +41,7 @@
#include "mongo/db/exec/document_value/value_comparator.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/javascript_execution.h"
#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
@@ -451,6 +452,9 @@ public:
// When non-empty, contains the unmodified user provided aggregation command.
BSONObj originalAggregateCommand;
+ // If present, the spec associated with the current change stream pipeline.
+ boost::optional<DocumentSourceChangeStreamSpec> changeStreamSpec;
+
// True if the expression context is the original one for a given pipeline.
// False if another context is created for the same pipeline. Used to disable duplicate
// expression counting.
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index 0c60275819e..d19d47493ab 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -600,6 +600,16 @@ Mongo.prototype._extractChangeStreamOptions = function(options) {
delete options.showExpandedEvents;
}
+ if (options.hasOwnProperty("showSystemEvents")) {
+ changeStreamOptions.showSystemEvents = options.showSystemEvents;
+ delete options.showSystemEvents;
+ }
+
+ if (options.hasOwnProperty("showRawUpdateDescription")) {
+ changeStreamOptions.showRawUpdateDescription = options.showRawUpdateDescription;
+ delete options.showRawUpdateDescription;
+ }
+
return [{$changeStream: changeStreamOptions}, options];
};