From 7cd6beba9999a93f985a99199eb5b824b7f18c75 Mon Sep 17 00:00:00 2001 From: Drew Paroski Date: Tue, 20 Jul 2021 20:36:04 +0000 Subject: SERVER-58511 Create feature flag for change stream oplog rewrites --- jstests/change_streams/metadata_notifications.js | 4 ++-- jstests/change_streams/start_after_invalidation_exception.js | 4 ++-- jstests/change_streams/whole_db_metadata_notifications.js | 4 ++-- jstests/libs/change_stream_util.js | 11 ++++++++++- .../change_stream_lookup_post_image_chunk_migration.js | 4 ++-- jstests/sharding/change_stream_metadata_notifications.js | 4 ++-- jstests/sharding/change_stream_read_preference.js | 4 ++-- jstests/sharding/change_stream_update_lookup_read_concern.js | 4 ++-- src/mongo/db/query/query_feature_flags.idl | 5 +++++ 9 files changed, 29 insertions(+), 15 deletions(-) diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index 991c776553c..1fb91425b95 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -5,7 +5,7 @@ (function() { "use strict"; -load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled. +load("jstests/libs/change_stream_util.js"); // For isChangeStreamsOptimizationEnabled. load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/fixture_helpers.js"); // For isSharded. @@ -126,7 +126,7 @@ assert.commandFailedWithCode(db.runCommand({ // Test that if change stream optimization is enabled, then even after the 'invalidate' event has // been filtered out, the cursor should hold the resume token of the 'invalidate' event. -if (isChangeStreamOptimizationEnabled(db)) { +if (isChangeStreamsOptimizationEnabled(db)) { const resumeStream = coll.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: resumeToken}); assert.soon(() => { diff --git a/jstests/change_streams/start_after_invalidation_exception.js b/jstests/change_streams/start_after_invalidation_exception.js index af72f7455e1..1d948c91307 100644 --- a/jstests/change_streams/start_after_invalidation_exception.js +++ b/jstests/change_streams/start_after_invalidation_exception.js @@ -6,9 +6,9 @@ "use strict"; load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. -load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled. +load("jstests/libs/change_stream_util.js"); // For isChangeStreamsOptimizationEnabled. -if (!isChangeStreamOptimizationEnabled(db)) { +if (!isChangeStreamsOptimizationEnabled(db)) { jsTestLog( 'Skipping test because featureFlagChangeStreamsOptimization feature flag is not enabled'); return; diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js index ff557cd0469..02089e5f042 100644 --- a/jstests/change_streams/whole_db_metadata_notifications.js +++ b/jstests/change_streams/whole_db_metadata_notifications.js @@ -6,7 +6,7 @@ "use strict"; load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest, - // isChangeStreamOptimizationEnabled. + // isChangeStreamsOptimizationEnabled. load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. @@ -204,7 +204,7 @@ const invalidateEvent = cst.assertNextChangesEqual( // Test that if change stream optimization is enabled, then even after the 'invalidate' event has // been filtered out, the cursor should hold the resume token of the 'invalidate' event. -if (isChangeStreamOptimizationEnabled(testDB)) { +if (isChangeStreamsOptimizationEnabled(testDB)) { const resumeStream = testDB.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: change._id}); assert.soon(() => { diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index c7176c7f112..8daa0e8be29 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -17,12 +17,21 @@ const ChangeStreamWatchMode = Object.freeze({ /** * Returns true if feature flag 'featureFlagChangeStreamsOptimization' is enabled, false otherwise. */ -function isChangeStreamOptimizationEnabled(db) { +function isChangeStreamsOptimizationEnabled(db) { const getParam = db.adminCommand({getParameter: 1, featureFlagChangeStreamsOptimization: 1}); return getParam.hasOwnProperty("featureFlagChangeStreamsOptimization") && getParam.featureFlagChangeStreamsOptimization.value; } +/** + * Returns true if feature flag 'featureFlagChangeStreamsRewrite' is enabled, false otherwise. + */ +function isChangeStreamsRewriteEnabled(db) { + const getParam = db.adminCommand({getParameter: 1, featureFlagChangeStreamsRewrite: 1}); + return getParam.hasOwnProperty("featureFlagChangeStreamsRewrite") && + getParam.featureFlagChangeStreamsRewrite.value; +} + /** * Helper function used internally by ChangeStreamTest. If no passthrough is active, it is exactly * the same as calling db.runCommand. If a passthrough is active and has defined a function diff --git a/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js b/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js index 9e20d81c806..80a52c5bbb7 100644 --- a/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js +++ b/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js @@ -7,7 +7,7 @@ (function() { 'use strict'; -load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled(). +load('jstests/libs/change_stream_util.js'); // For isChangeStreamsOptimizationEnabled(). load('jstests/libs/profiler.js'); // For various profiler helpers. const st = new ShardingTest({ @@ -26,7 +26,7 @@ const shard0 = st.shard0; const shard1 = st.shard1; const shard2 = st.shard2; -const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongos.getDB(dbName)); +const isChangeStreamOptimized = isChangeStreamsOptimizationEnabled(mongos.getDB(dbName)); // Enable sharding to inform mongos of the database, and make sure all chunks start on shard 0. assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js index 1e3273bfe4a..d3d322a9899 100644 --- a/jstests/sharding/change_stream_metadata_notifications.js +++ b/jstests/sharding/change_stream_metadata_notifications.js @@ -4,7 +4,7 @@ // ] (function() { "use strict"; -load("jstests/libs/change_stream_util.js"); // For isChangeStreamOptimizationEnabled. +load("jstests/libs/change_stream_util.js"); // For isChangeStreamsOptimizationEnabled. load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. @@ -18,7 +18,7 @@ const st = new ShardingTest({ const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; -const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB); +const isChangeStreamOptimized = isChangeStreamsOptimizationEnabled(mongosDB); assert.commandWorked(mongosDB.dropDatabase()); diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js index c1f6cd55a76..5676c24e4e3 100644 --- a/jstests/sharding/change_stream_read_preference.js +++ b/jstests/sharding/change_stream_read_preference.js @@ -8,7 +8,7 @@ (function() { "use strict"; -load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled(). +load('jstests/libs/change_stream_util.js'); // For isChangeStreamsOptimizationEnabled(). load('jstests/libs/profiler.js'); // For various profiler helpers. const st = new ShardingTest({ @@ -63,7 +63,7 @@ assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true}); assert.soon(() => primaryStream.hasNext()); assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true}); -const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB); +const isChangeStreamOptimized = isChangeStreamsOptimizationEnabled(mongosDB); for (let rs of [st.rs0, st.rs1]) { const primaryDB = rs.getPrimary().getDB(dbName); diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js index fbe96e33bda..612a3347930 100644 --- a/jstests/sharding/change_stream_update_lookup_read_concern.js +++ b/jstests/sharding/change_stream_update_lookup_read_concern.js @@ -9,7 +9,7 @@ (function() { "use strict"; -load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled(). +load('jstests/libs/change_stream_util.js'); // For isChangeStreamsOptimizationEnabled(). load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers. load("jstests/replsets/rslib.js"); // For reconfig(). @@ -62,7 +62,7 @@ assert.commandWorked(st.s.adminCommand( const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; -const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB); +const isChangeStreamOptimized = isChangeStreamsOptimizationEnabled(mongosDB); assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); assert.commandWorked( diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl index b5bf83431af..a03fe739aae 100644 --- a/src/mongo/db/query/query_feature_flags.idl +++ b/src/mongo/db/query/query_feature_flags.idl @@ -35,6 +35,11 @@ feature_flags: cpp_varname: gFeatureFlagChangeStreamsOptimization default: false + featureFlagChangeStreamsRewrite: + description: "Feature flag for enabling change streams oplog rewrites" + cpp_varname: gFeatureFlagChangeStreamsRewrite + default: false + featureFlagDensify: description: "Feature flag for allowing a $densify stage in an aggregation pipeline" cpp_varname: gFeatureFlagDensify -- cgit v1.2.1