From 2572165d03ab73a8969dd60f5c0fbda20ad41aef Mon Sep 17 00:00:00 2001 From: Andrii Dobroshynski Date: Thu, 17 Feb 2022 08:49:56 +0000 Subject: SERVER-61893 Add capability for showing system events with showSystemEvents flag --- jstests/change_streams/show_system_events.js | 258 +++++++++++++++++++++ jstests/libs/change_stream_util.js | 1 + .../db/pipeline/aggregation_context_fixture.h | 1 + .../db/pipeline/change_stream_filter_helpers.cpp | 14 +- .../db/pipeline/change_stream_rewrite_helpers.cpp | 2 +- .../db/pipeline/document_source_change_stream.cpp | 53 ++++- .../db/pipeline/document_source_change_stream.h | 22 +- .../db/pipeline/document_source_change_stream.idl | 6 + src/mongo/db/pipeline/expression_context.h | 4 + src/mongo/shell/mongo.js | 10 + 10 files changed, 348 insertions(+), 23 deletions(-) create mode 100644 jstests/change_streams/show_system_events.js diff --git a/jstests/change_streams/show_system_events.js b/jstests/change_streams/show_system_events.js new file mode 100644 index 00000000000..ba7633fdcc0 --- /dev/null +++ b/jstests/change_streams/show_system_events.js @@ -0,0 +1,258 @@ +/** + * Tests the behavior of change streams in the presence of 'showSystemEvents' flag. + * + * @tags: [ + * requires_fcv_60, + * # This test assumes certain ordering of events. + * assumes_unsharded_collection, + * # Assumes to implicit index creation. + * assumes_no_implicit_index_creation + * ] + */ +(function() { +"use strict"; + +load('jstests/libs/change_stream_util.js'); // For 'ChangeStreamTest' and + // 'assertChangeStreamEventEq'. +load('jstests/libs/collection_drop_recreate.js'); // For 'assertDropCollection'. + +const testDB = db.getSiblingDB(jsTestName()); + +if (!isChangeStreamsVisibilityEnabled(testDB)) { + assert.commandFailedWithCode(testDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {showSystemEvents: true}}], + cursor: {}, + }), + 6189301); + return; +} + +// Assert that the flag is not allowed with 'apiStrict'. +assert.commandFailedWithCode(testDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {showSystemEvents: true}}], + cursor: {}, + apiVersion: "1", + apiStrict: true +}), + ErrorCodes.APIStrictError); + +const test = new ChangeStreamTest(testDB); + +const systemNS = { + db: testDB.getName(), + coll: 'system.js' +}; +const collRenamed = 'collRenamed'; + +function runWholeDbChangeStreamTestWithoutSystemEvents(test, cursor, nonSystemColl) { + assertDropCollection(testDB, nonSystemColl.getName()); + + let expected = { + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "drop", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Write to the 'normal' collection. + assert.commandWorked(nonSystemColl.insert({_id: 1, a: 1})); + + // Insert a document into the system.js collection. + assert.commandWorked(testDB.system.js.insert({_id: 3, c: 1})); + + // The next event should still be only the insert into the 'regular' collection, even though + // we've inserted into the system collection. + let expectedChanges = [ + { + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "create", + }, + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 1}, + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "insert", + } + ]; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges}); + + // Update into a system collection. + assert.commandWorked(testDB.system.js.update({_id: 3}, {c: 2})); + // Delete from a system collection. + assert.commandWorked(testDB.system.js.remove({_id: 3})); + + // Rename the system collection. + assert.commandWorked(testDB.system.js.renameCollection(collRenamed)); + // Rename back to system collection. + assert.commandWorked(testDB[collRenamed].renameCollection(systemNS.coll)); + + // We should see both renames because they involve a normal namespace. However, we don't see any + // of the preceding CRUD operations on the system collection. + expectedChanges = [ + { + ns: {db: testDB.getName(), coll: systemNS.coll}, + to: {db: testDB.getName(), coll: collRenamed}, + operationType: "rename", + }, + { + ns: {db: testDB.getName(), coll: collRenamed}, + to: {db: testDB.getName(), coll: systemNS.coll}, + operationType: "rename", + } + ]; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges}); + + // Once again write to the 'normal' collection. + assert.commandWorked(nonSystemColl.insert({_id: 2, a: 1})); + + // Similar as to before, the next event should be the insert on the 'regular' collection even + // though we have performed a number of operations on the system collection. + expected = { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 1}, + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "insert", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); +} + +function runWholeDbChangeStreamTestWithSystemEvents(test, cursor, nonSystemColl) { + assertDropCollection(testDB, nonSystemColl.getName()); + + let expected = { + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "drop", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Write to the 'normal' collection. + assert.commandWorked(nonSystemColl.insert({_id: 1, a: 1})); + + let expectedChanges = [ + { + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "create", + }, + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 1}, + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "insert", + } + ]; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges}); + + // Insert into a system collection. + assert.commandWorked(testDB.system.js.insert({_id: 2, b: 1})); + + expected = { + documentKey: {_id: 2}, + fullDocument: {_id: 2, b: 1}, + ns: {db: testDB.getName(), coll: systemNS.coll}, + operationType: "insert", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Update into a system collection. + assert.commandWorked(testDB.system.js.update({_id: 2}, {b: 2})); + + expected = { + documentKey: {_id: 2}, + fullDocument: {_id: 2, b: 2}, + ns: {db: testDB.getName(), coll: systemNS.coll}, + operationType: "replace", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Delete from a system collection. + assert.commandWorked(testDB.system.js.remove({_id: 2})); + + expected = { + documentKey: {_id: 2}, + ns: {db: testDB.getName(), coll: systemNS.coll}, + operationType: "delete", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Rename the system collection. + assert.commandWorked(testDB.system.js.renameCollection(collRenamed)); + // Rename back to system collection. + assert.commandWorked(testDB[collRenamed].renameCollection(systemNS.coll)); + + expectedChanges = [ + { + ns: {db: testDB.getName(), coll: systemNS.coll}, + to: {db: testDB.getName(), coll: collRenamed}, + operationType: "rename", + }, + { + ns: {db: testDB.getName(), coll: collRenamed}, + to: {db: testDB.getName(), coll: systemNS.coll}, + operationType: "rename", + } + ]; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges}); +} + +function runSingleCollectionChangeStreamTest(test, cursor, nonSystemColl) { + // Write to the 'normal' collection. + assert.commandWorked(nonSystemColl.insert({_id: 1, a: 1})); + + let expected = { + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "create", + }; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Insert into a system collection. + assert.commandWorked(testDB.system.js.insert({_id: 1, a: 1})); + // Update into a system collection. + assert.commandWorked(testDB.system.js.update({_id: 1}, {a: 2})); + // Delete from a system collection. + assert.commandWorked(testDB.system.js.remove({_id: 1})); + + // Rename the system collection. + assert.commandWorked(testDB.system.js.renameCollection(collRenamed)); + // Rename back to system collection. + assert.commandWorked(testDB[collRenamed].renameCollection(systemNS.coll)); + + // Write again to the 'normal' collection as a sentinel write. + assert.commandWorked(nonSystemColl.insert({_id: 2, a: 2})); + + // The only expected events should be the two inserts into the non-system collection. + const expectedChanges = [ + { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 1}, + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 2}, + ns: {db: testDB.getName(), coll: nonSystemColl.getName()}, + operationType: "insert", + } + ]; + test.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges}); +} + +const regularColl = testDB.test_coll; +regularColl.drop(); + +// Run a single-collection stream on a normal collection with 'showSystemEvents' set to 'true'. +let pipeline = [{$changeStream: {showExpandedEvents: true, showSystemEvents: true}}]; +let cursor = test.startWatchingChanges({pipeline: pipeline, collection: regularColl}); +runSingleCollectionChangeStreamTest(test, cursor, regularColl); + +// Run a whole-DB stream with 'showSystemEvents' set to 'true'. +pipeline = [{$changeStream: {showExpandedEvents: true, showSystemEvents: true}}]; +cursor = test.startWatchingChanges({pipeline: pipeline, collection: 1}); +runWholeDbChangeStreamTestWithSystemEvents(test, cursor, regularColl); + +// Now run a whole-DB stream with 'showSystemEvents' set to 'false'. +pipeline = [{$changeStream: {showExpandedEvents: true, showSystemEvents: false}}]; +cursor = test.startWatchingChanges({pipeline: pipeline, collection: 1}); +runWholeDbChangeStreamTestWithoutSystemEvents(test, cursor, regularColl); +}()); \ No newline at end of file diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index a6697bf5b03..956e4e83bc2 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -192,6 +192,7 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * 'aggregateOptions' if provided and saves the cursor so that it can be cleaned up later. */ self.startWatchingAllChangesForCluster = function(aggregateOptions) { + assert.eq(_db.getName(), "admin"); return self.startWatchingChanges({ pipeline: [{$changeStream: {allChangesForCluster: true}}], collection: 1, diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h index f6060260455..a4dfcfeb769 100644 --- a/src/mongo/db/pipeline/aggregation_context_fixture.h +++ b/src/mongo/db/pipeline/aggregation_context_fixture.h @@ -56,6 +56,7 @@ public: _expCtx = make_intrusive(_opCtx.get(), nss); unittest::TempDir tempDir("AggregationContextFixture"); _expCtx->tempDir = tempDir.path(); + _expCtx->changeStreamSpec = DocumentSourceChangeStreamSpec(); } auto getExpCtx() { diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index 73e4eb3b26f..e0016fa00ad 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -56,9 +56,9 @@ std::unique_ptr buildOperationFilter( const boost::intrusive_ptr& expCtx, const MatchExpression* userMatch) { // Regexes to match each of the necessary namespace components for the current stream type. - auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns); - auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx->ns); - auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx->ns); + auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx); + auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx); + auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx); auto streamType = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns); @@ -192,9 +192,9 @@ std::unique_ptr buildTransactionFilter( BSONArrayBuilder orBuilder(applyOpsBuilder.subarrayStart("$or")); { // Regexes for full-namespace, collection, and command-namespace matching. - auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns); - auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx->ns); - auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx->ns); + auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx); + auto collRegex = DocumentSourceChangeStream::getCollRegexForChangeStream(expCtx); + auto cmdNsRegex = DocumentSourceChangeStream::getCmdNsRegexForChangeStream(expCtx); // Match relevant CRUD events on the monitored namespaces. orBuilder.append(BSON("o.applyOps.ns" << BSONRegEx(nsRegex))); @@ -253,7 +253,7 @@ std::unique_ptr buildInternalOpFilter( } // Also filter for shardCollection events, which are recorded as {op: 'n'} in the oplog. - auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns); + auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx); internalOpTypeOrBuilder.append(BSON("o2.shardCollection" << BSONRegEx(nsRegex))); // Finalize the array of $or filter predicates. diff --git a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp index e5324b9cd88..071ec300587 100644 --- a/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_rewrite_helpers.cpp @@ -820,7 +820,7 @@ std::unique_ptr matchRewriteGenericNamespace( if (fieldName == "db") { return "^" + DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) + - "\\." + DocumentSourceChangeStream::kRegexAllCollections; + "\\." + DocumentSourceChangeStream::resolveAllCollectionsRegex(expCtx); } return DocumentSourceChangeStream::kRegexAllDBs + "\\." + DocumentSourceChangeStream::regexEscapeNsForChangeStream(nsElem.str()) + diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a5f8f7eb3a0..37d818ca7a4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -105,6 +105,8 @@ constexpr StringData DocumentSourceChangeStream::kReshardDoneCatchUpOpType; constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType; constexpr StringData DocumentSourceChangeStream::kRegexAllCollections; +constexpr StringData DocumentSourceChangeStream::kRegexAllCollectionsShowSystemEvents; + constexpr StringData DocumentSourceChangeStream::kRegexAllDBs; constexpr StringData DocumentSourceChangeStream::kRegexCmdColl; @@ -127,28 +129,44 @@ DocumentSourceChangeStream::ChangeStreamType DocumentSourceChangeStream::getChan : ChangeStreamType::kSingleCollection)); } -std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const NamespaceString& nss) { - auto type = getChangeStreamType(nss); +StringData DocumentSourceChangeStream::resolveAllCollectionsRegex( + const boost::intrusive_ptr& expCtx) { + // We never expect this method to be called except when building a change stream pipeline. + tassert(6189300, + "Expected change stream spec to be set on the expression context", + expCtx->changeStreamSpec); + // If 'showSystemEvents' is set, return a less stringent regex. + return (expCtx->changeStreamSpec->getShowSystemEvents() + ? DocumentSourceChangeStream::kRegexAllCollectionsShowSystemEvents + : DocumentSourceChangeStream::kRegexAllCollections); +} + +std::string DocumentSourceChangeStream::getNsRegexForChangeStream( + const boost::intrusive_ptr& expCtx) { + const auto type = getChangeStreamType(expCtx->ns); + const auto& nss = expCtx->ns; switch (type) { case ChangeStreamType::kSingleCollection: // Match the target namespace exactly. return "^" + regexEscapeNsForChangeStream(nss.ns()) + "$"; case ChangeStreamType::kSingleDatabase: // Match all namespaces that start with db name, followed by ".", then NOT followed by - // '$' or 'system.' + // '$' or 'system.' unless 'showSystemEvents' is set. return "^" + regexEscapeNsForChangeStream(nss.db().toString()) + "\\." + - kRegexAllCollections; + resolveAllCollectionsRegex(expCtx); case ChangeStreamType::kAllChangesForCluster: // Match all namespaces that start with any db name other than admin, config, or local, - // followed by ".", then NOT followed by '$' or 'system.'. - return kRegexAllDBs + "\\." + kRegexAllCollections; + // followed by ".", then NOT '$' or 'system.' unless 'showSystemEvents' is set. + return kRegexAllDBs + "\\." + resolveAllCollectionsRegex(expCtx); default: MONGO_UNREACHABLE; } } -std::string DocumentSourceChangeStream::getCollRegexForChangeStream(const NamespaceString& nss) { - auto type = getChangeStreamType(nss); +std::string DocumentSourceChangeStream::getCollRegexForChangeStream( + const boost::intrusive_ptr& expCtx) { + const auto type = getChangeStreamType(expCtx->ns); + const auto& nss = expCtx->ns; switch (type) { case ChangeStreamType::kSingleCollection: // Match the target collection exactly. @@ -156,14 +174,16 @@ std::string DocumentSourceChangeStream::getCollRegexForChangeStream(const Namesp case ChangeStreamType::kSingleDatabase: case ChangeStreamType::kAllChangesForCluster: // Match any collection; database filtering will be done elsewhere. - return "^" + kRegexAllCollections; + return "^" + resolveAllCollectionsRegex(expCtx); default: MONGO_UNREACHABLE; } } -std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream(const NamespaceString& nss) { - auto type = getChangeStreamType(nss); +std::string DocumentSourceChangeStream::getCmdNsRegexForChangeStream( + const boost::intrusive_ptr& expCtx) { + const auto type = getChangeStreamType(expCtx->ns); + const auto& nss = expCtx->ns; switch (type) { case ChangeStreamType::kSingleCollection: case ChangeStreamType::kSingleDatabase: @@ -232,6 +252,9 @@ list> DocumentSourceChangeStream::createFromBson( // Make sure that it is legal to run this $changeStream before proceeding. DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec); + // Save a copy of the spec on the expression context. Used when building the oplog filter. + expCtx->changeStreamSpec = spec; + // If we see this stage on a shard, it means that the raw $changeStream stage was dispatched to // us from an old mongoS. Build a legacy shard pipeline. if (expCtx->needsMerge) { @@ -376,7 +399,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( uassert(6188501, "the 'featureFlagChangeStreamsVisibility' should be enabled to use " - "'showEnhancedEvents:true' in the change stream spec", + "'showExpandedEvents:true' in the change stream spec", feature_flags::gFeatureFlagChangeStreamsVisibility.isEnabledAndIgnoreFCV() || !spec.getShowExpandedEvents()); @@ -386,6 +409,12 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( feature_flags::gFeatureFlagChangeStreamsVisibility.isEnabledAndIgnoreFCV() || !spec.getShowRawUpdateDescription()); + uassert(6189301, + "the 'featureFlagChangeStreamsVisibility' should be enabled to use " + "'showSystemEvents:true' in the change stream spec", + feature_flags::gFeatureFlagChangeStreamsVisibility.isEnabledAndIgnoreFCV() || + !spec.getShowSystemEvents()); + uassert(31123, "Change streams from mongos may not show migration events", !(expCtx->inMongos && spec.getShowMigrationEvents())); diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 93831c6b8a5..ae281596679 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -121,6 +121,12 @@ public: "API Version 1", _spec.Obj()[DocumentSourceChangeStreamSpec::kShowRawUpdateDescriptionFieldName] .eoo()); + + uassert( + ErrorCodes::APIStrictError, + "The 'showSystemEvents' parameter to $changeStream is not supported in API " + "Version 1", + _spec.Obj()[DocumentSourceChangeStreamSpec::kShowSystemEventsFieldName].eoo()); } } @@ -229,7 +235,12 @@ public: static constexpr StringData kDropIndexesOpType = "dropIndexes"_sd; static constexpr StringData kShardCollectionOpType = "shardCollection"_sd; + // Default regex for collections match which prohibits system collections. static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd; + // Regex matching all regular collections plus certain system collections. + static constexpr StringData kRegexAllCollectionsShowSystemEvents = + R"((?!(\$|system\.(?!(js$)))))"_sd; + static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd; static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd; @@ -239,9 +250,14 @@ public: * Helpers for Determining which regex to match a change stream against. */ static ChangeStreamType getChangeStreamType(const NamespaceString& nss); - static std::string getNsRegexForChangeStream(const NamespaceString& nss); - static std::string getCollRegexForChangeStream(const NamespaceString& nss); - static std::string getCmdNsRegexForChangeStream(const NamespaceString& nss); + static std::string getNsRegexForChangeStream( + const boost::intrusive_ptr& expCtx); + static std::string getCollRegexForChangeStream( + const boost::intrusive_ptr& expCtx); + static std::string getCmdNsRegexForChangeStream( + const boost::intrusive_ptr& expCtx); + static StringData resolveAllCollectionsRegex( + const boost::intrusive_ptr& expCtx); static std::string regexEscapeNsForChangeStream(StringData source); diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index 69abc61d4a8..e1b8227a0fc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -150,6 +150,12 @@ structs: of data. Instead they reflect this data moving from one shard to another. + showSystemEvents: + cpp_name: showSystemEvents + type: optionalBool + description: A flag indicating whether the stream should report events on system + collections. + allowToRunOnConfigDB: cpp_name: allowToRunOnConfigDB type: optionalBool diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index c449888f726..9334cd4ecff 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -41,6 +41,7 @@ #include "mongo/db/exec/document_value/value_comparator.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/pipeline/javascript_execution.h" #include "mongo/db/pipeline/legacy_runtime_constants_gen.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" @@ -451,6 +452,9 @@ public: // When non-empty, contains the unmodified user provided aggregation command. BSONObj originalAggregateCommand; + // If present, the spec associated with the current change stream pipeline. + boost::optional changeStreamSpec; + // True if the expression context is the original one for a given pipeline. // False if another context is created for the same pipeline. Used to disable duplicate // expression counting. diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index 0c60275819e..d19d47493ab 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -600,6 +600,16 @@ Mongo.prototype._extractChangeStreamOptions = function(options) { delete options.showExpandedEvents; } + if (options.hasOwnProperty("showSystemEvents")) { + changeStreamOptions.showSystemEvents = options.showSystemEvents; + delete options.showSystemEvents; + } + + if (options.hasOwnProperty("showRawUpdateDescription")) { + changeStreamOptions.showRawUpdateDescription = options.showRawUpdateDescription; + delete options.showRawUpdateDescription; + } + return [{$changeStream: changeStreamOptions}, options]; }; -- cgit v1.2.1