summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-10-26 14:52:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-29 16:20:39 +0000
commit8e4194f6b6704b088eebe5f380026c9ade9ddbcb (patch)
tree736a8d9f6e7d1a74a827115c3b86b34708171e5f
parente8aafb99f494c29e6ed05e3696a9cd2bb6b3ea5b (diff)
downloadmongo-8e4194f6b6704b088eebe5f380026c9ade9ddbcb.tar.gz
SERVER-60957 Support change stream pre and post images in a sharded cluster.
-rw-r--r--jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js (renamed from jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js)48
-rw-r--r--jstests/change_streams/lookup_pit_pre_and_post_image.js18
-rw-r--r--jstests/change_streams/lookup_pre_image.js30
-rw-r--r--jstests/libs/change_stream_util.js13
-rw-r--r--jstests/libs/override_methods/implicit_whole_cluster_changestreams.js8
-rw-r--r--jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h5
8 files changed, 109 insertions, 43 deletions
diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js
index 847642da319..c9b959abee1 100644
--- a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
+++ b/jstests/change_streams/change_stream_pre_image_lookup_whole_db_whole_cluster.js
@@ -4,26 +4,52 @@
* specifies a pipeline that filters out changes to any collections which do not
* have pre-images enabled.
*
- * @tags: [uses_change_streams, requires_replication]
+ * @tags: [
+ * uses_change_streams,
+ * # TODO SERVER-58694: remove this tag.
+ * change_stream_does_not_expect_txns,
+ * # TODO SERVER-60238: remove this tag.
+ * assumes_read_preference_unchanged
+ * ]
*/
(function() {
"use strict";
-const rst = new ReplSetTest({nodes: 1});
-rst.startSet();
-rst.initiate();
+load("jstests/libs/change_stream_util.js"); // For canRecordPreImagesInConfigDatabase.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-const testDB = rst.getPrimary().getDB(jsTestName());
-const adminDB = rst.getPrimary().getDB("admin");
+const testDB = db.getSiblingDB(jsTestName());
+const adminDB = db.getSiblingDB("admin");
+const collWithPreImageName = "coll_with_pre_images";
+const collWithNoPreImageName = "coll_with_no_pre_images";
+const canRecordPreImagesInConfigDb = canRecordPreImagesInConfigDatabase(testDB);
+
+if (!canRecordPreImagesInConfigDb && FixtureHelpers.isMongos(db)) {
+ jsTestLog("Skipping test as pre image lookup is not supported in sharded cluster with feature" +
+ "flag 'featureFlagChangeStreamPreAndPostImages' disabled.");
+ return;
+}
+
+assert.commandWorked(testDB.dropDatabase());
// Create one collection that has pre-image recording enabled...
-const collWithPreImages = testDB.coll_with_pre_images;
-assert.commandWorked(testDB.createCollection(collWithPreImages.getName(), {recordPreImages: true}));
+if (!canRecordPreImagesInConfigDb) {
+ assert.commandWorked(testDB.createCollection(collWithPreImageName, {recordPreImages: true}));
+} else {
+ assert.commandWorked(testDB.createCollection(collWithPreImageName,
+ {changeStreamPreAndPostImages: {enabled: true}}));
+}
//... and one collection which has pre-images disabled.
+if (!canRecordPreImagesInConfigDb) {
+ assert.commandWorked(testDB.createCollection(collWithNoPreImageName, {recordPreImages: false}));
+} else {
+ assert.commandWorked(testDB.createCollection(collWithNoPreImageName,
+ {changeStreamPreAndPostImages: {enabled: false}}));
+}
+
+const collWithPreImages = testDB.coll_with_pre_images;
const collWithNoPreImages = testDB.coll_with_no_pre_images;
-assert.commandWorked(
- testDB.createCollection(collWithNoPreImages.getName(), {recordPreImages: false}));
//... and a collection that will hold the sentinal document that marks the end of changes
const sentinelColl = testDB.sentinelColl;
@@ -103,6 +129,4 @@ for (let runOnDB of [testDB, adminDB]) {
assert.eq(observedEvent.fullDocumentBeforeChange, expectedEvent.fullDocumentBeforeChange);
}
}
-
-rst.stopSet();
})();
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js
index 69153720d61..b06dba63bb6 100644
--- a/jstests/change_streams/lookup_pit_pre_and_post_image.js
+++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js
@@ -1,24 +1,22 @@
// Tests that the point-in-time pre- and post-images are loaded correctly in $changeStream running
// with different arguments for collections with 'changeStreamPreAndPostImages' being enabled.
// @tags: [
-// assumes_against_mongod_not_mongos,
-// change_stream_does_not_expect_txns,
-// multiversion_incompatible,
+// # TODO SERVER-58694: remove this tag.
+// change_stream_does_not_expect_txns,
+// multiversion_incompatible,
+// # TODO SERVER-60238: remove this tag.
+// assumes_read_preference_unchanged
// ]
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
-load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled.
+load("jstests/libs/change_stream_util.js"); // For canRecordPreImagesInConfigDatabase.
const testDB = db.getSiblingDB(jsTestName());
const collName = "test";
-const clusteredIndexesEnabled =
- assert.commandWorked(testDB.adminCommand({getParameter: 1, featureFlagClusteredIndexes: 1}))
- .featureFlagClusteredIndexes.value;
-
-if (!(isChangeStreamPreAndPostImagesEnabled(db) && clusteredIndexesEnabled)) {
+if (!canRecordPreImagesInConfigDatabase(testDB)) {
const coll = assertDropAndRecreateCollection(testDB, collName);
// If feature flag is off, creating changeStream with new fullDocument arguments should throw.
@@ -27,7 +25,7 @@ if (!(isChangeStreamPreAndPostImagesEnabled(db) && clusteredIndexesEnabled)) {
assert.throwsWithCode(() => coll.watch([], {fullDocument: 'required'}), ErrorCodes.BadValue);
jsTestLog(
- 'Skipping test because featureFlagChangeStreamPreAndPostImages or featureFlagClusteredIndexes feature flag is not enabled');
+ "Skipping test because pre-image recording capability in 'system.preimages' is not enabled.");
return;
}
diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js
index 120c6de5671..f85a46a9e16 100644
--- a/jstests/change_streams/lookup_pre_image.js
+++ b/jstests/change_streams/lookup_pre_image.js
@@ -2,25 +2,42 @@
* Tests the behaviour of the 'fullDocumentBeforeChange' argument to the $changeStream stage.
*
* @tags: [
- * assumes_against_mongod_not_mongos,
* assumes_unsharded_collection,
* do_not_wrap_aggregations_in_facets,
* uses_multiple_connections,
* multiversion_incompatible,
+ * # TODO SERVER-58694: remove this tag.
+ * change_stream_does_not_expect_txns,
+ * # TODO SERVER-60238: remove this tag.
+ * assumes_read_preference_unchanged
* ]
*/
(function() {
"use strict";
-load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and
+ // canRecordPreImagesInConfigDatabase.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+const canRecordPreImagesInConfigDb = canRecordPreImagesInConfigDatabase(db);
+
+if (!canRecordPreImagesInConfigDb && FixtureHelpers.isMongos(db)) {
+ jsTestLog("Skipping test as pre image lookup is not supported in sharded cluster with feature" +
+ "flag 'featureFlagChangeStreamPreAndPostImages' disabled.");
+ return;
+}
+
const coll = assertDropAndRecreateCollection(db, "change_stream_pre_images");
const cst = new ChangeStreamTest(db);
// Enable pre-image recording on the test collection.
-assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: true}));
+if (!canRecordPreImagesInConfigDb) {
+ assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: true}));
+} else {
+ assert.commandWorked(
+ db.runCommand({collMod: coll.getName(), changeStreamPreAndPostImages: {enabled: true}}));
+}
// Open three streams on the collection, one for each "fullDocumentBeforeChange" mode.
const csNoPreImages = cst.startWatchingChanges({
@@ -97,7 +114,12 @@ assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor));
// Now disable pre-image generation on the test collection and re-test.
-assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: false}));
+if (!canRecordPreImagesInConfigDb) {
+ assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: false}));
+} else {
+ assert.commandWorked(
+ db.runCommand({collMod: coll.getName(), changeStreamPreAndPostImages: {enabled: false}}));
+}
// Test pre-image lookup for an insertion. No pre-image exists on any cursor.
assert.commandWorked(coll.insert({_id: "y"}));
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index de567d908f6..97f5bb60e21 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -45,6 +45,19 @@ function isChangeStreamsRewriteEnabled(db) {
}
/**
+ * Returns true if pre-images can be recorded in 'system.preimages' collection, false otherwise.
+ */
+function canRecordPreImagesInConfigDatabase(db) {
+ // Clustered index feature must be enabled to record pre-images in 'system.preimages'
+ // collection.
+ const clusteredIndexesEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagClusteredIndexes: 1}))
+ .featureFlagClusteredIndexes.value;
+
+ return isChangeStreamPreAndPostImagesEnabled(db) && clusteredIndexesEnabled;
+}
+
+/**
* Helper function used internally by ChangeStreamTest. If no passthrough is active, it is exactly
* the same as calling db.runCommand. If a passthrough is active and has defined a function
* 'changeStreamPassthroughAwareRunCommand', then this method will be overridden to allow individual
diff --git a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
index 9e13e0d0847..53fe1c6c8aa 100644
--- a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
+++ b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
@@ -58,7 +58,11 @@ ChangeStreamPassthroughHelpers.passthroughType = function() {
// we need to override the helper to ensure that the Mongo.watch function itself is exercised by the
// passthrough wherever Collection.watch or DB.watch is called.
DB.prototype.watch = function(pipeline, options) {
- pipeline = Object.assign([], pipeline);
- pipeline.unshift(ChangeStreamPassthroughHelpers.nsMatchFilter(this, 1));
+ // If the database being watched is 'admin', then don't update the pipeline. The pipeline in
+ // this case will update the 'ns.db' to 'admin' which will match nothing.
+ if (this.getName() !== "admin") {
+ pipeline = Object.assign([], pipeline);
+ pipeline.unshift(ChangeStreamPassthroughHelpers.nsMatchFilter(this, 1));
+ }
return this.getMongo().watch(pipeline, options);
};
diff --git a/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js b/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js
index 8f3738f4700..dd871885c6c 100644
--- a/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js
+++ b/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js
@@ -6,6 +6,8 @@
(function() {
'use strict';
+load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled.
+
const st = new ShardingTest({
shards: 1,
mongos: 1,
@@ -15,6 +17,14 @@ const st = new ShardingTest({
const shard = st.shard0;
const mongos = st.s;
+if (isChangeStreamPreAndPostImagesEnabled(mongos.getDB("test"))) {
+ jsTestLog(
+ "Skipping test as pre-image lookup is supported in sharded clusters with feature flag " +
+ "'featureFlagChangeStreamPreAndPostImages' enabled.");
+ st.stop();
+ return;
+}
+
// Test that we cannot create a collection with pre-images enabled in a sharded cluster.
assert.commandFailed(shard.getDB("test").runCommand({create: "test", recordPreImages: true}));
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 11b551ac755..b741b87a2a8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -279,14 +279,13 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
stages.push_back(DocumentSourceChangeStreamCheckTopologyChange::create(expCtx));
}
- // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
- // (after DSCSCheckTopologyChange) so that any $match stages which follow the $changeStream
+
+ // If 'fullDocumentBeforeChange' is not set to 'off', add the DSCSAddPreImage stage into the
+ // pipeline. We place this stage here so that any $match stages which follow the $changeStream
// pipeline may be able to skip ahead of the DSCSAddPreImage stage. This allows a whole-db or
// whole-cluster stream to run on an instance where only some collections have pre-images
// enabled, so long as the user filters for only those namespaces.
- // TODO SERVER-36941: figure out how to get this to work in a sharded cluster.
if (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff) {
- invariant(!expCtx->inMongos);
stages.push_back(DocumentSourceChangeStreamAddPreImage::create(expCtx, spec));
}
@@ -350,16 +349,15 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
<< (spec.getAllowToRunOnSystemNS() ? " through mongos" : ""),
!expCtx->ns.isSystem() || (spec.getAllowToRunOnSystemNS() && !expCtx->inMongos));
- // TODO SERVER-36941: We do not currently support sharded pre-image lookup.
- const bool shouldAddPreImage =
- (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
- uassert(51771,
- "the 'fullDocumentBeforeChange' option is not supported in a sharded cluster",
- !(shouldAddPreImage && (expCtx->inMongos || expCtx->needsMerge)));
-
// TODO SERVER-58584: remove the feature flag.
if (!feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabled(
serverGlobalParams.featureCompatibility)) {
+ const bool shouldAddPreImage =
+ (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
+ uassert(51771,
+ "the 'fullDocumentBeforeChange' option is not supported in a sharded cluster",
+ !(shouldAddPreImage && (expCtx->inMongos || expCtx->needsMerge)));
+
uassert(ErrorCodes::BadValue,
str::stream() << "Specified value '"
<< FullDocumentMode_serializer(spec.getFullDocument())
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h
index 8e9c764fe19..cc735fc135e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.h
@@ -94,11 +94,8 @@ public:
return constraints;
}
- // Conceptually, this stage must always run on the shards part of the pipeline. At
- // present, however, pre-image retrieval is not supported in a sharded cluster, and
- // so the distributed plan logic will not be used.
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- return DistributedPlanLogic{this, nullptr, boost::none};
+ return boost::none;
}
DepsTracker::State getDependencies(DepsTracker* deps) const {