summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2020-02-10 10:16:59 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-10 13:08:24 +0000
commit1a8fea4b5a2fe7ec14d88d1ce4e7c8a1eaaa03cd (patch)
tree2c15f4dc5129ffa398ff4f07938754b62bbfb8a8
parent6c9c0b19d5980f065e1ff2ad624bb8d18bb88fe5 (diff)
downloadmongo-1a8fea4b5a2fe7ec14d88d1ce4e7c8a1eaaa03cd.tar.gz
SERVER-45807 Add change stream stage to fetch pre-image for update/replace/delete events
create mode 100644 jstests/change_streams/lookup_pre_image.js create mode 100644 jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js create mode 100644 jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js create mode 100644 src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp create mode 100644 src/mongo/db/pipeline/document_source_lookup_change_pre_image.h
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml3
-rw-r--r--jstests/change_streams/lookup_pre_image.js137
-rw-r--r--jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js109
-rw-r--r--jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js38
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp348
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h3
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp137
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_pre_image.h130
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp53
-rw-r--r--src/mongo/shell/mongo.js10
26 files changed, 1049 insertions, 22 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index f3e3a089fbc..62e5d1a2e1c 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -14,6 +14,8 @@ selector:
- assumes_write_concern_unchanged
# Transactions not supported on sharded clusters.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index dbf3a7a093f..05357a578b5 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -14,6 +14,8 @@ selector:
- assumes_write_concern_unchanged
# Transactions not supported on sharded clusters.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml
index 88ce52e3d1b..56f91e843c0 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml
@@ -15,6 +15,8 @@ selector:
# These tests make assumptions about change stream results that are no longer true once operations
# get bundled into transactions.
- change_stream_does_not_expect_txns
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml
index 2b5f6f4e5a8..d4b7fee5122 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml
@@ -15,6 +15,9 @@ selector:
# These tests make assumptions about change stream results that are no longer true once operations
# get bundled into transactions.
- change_stream_does_not_expect_txns
+ # Exclude any tests that on't support sharding
+ - assumes_against_mongod_not_mongos,
+ - assumes_unsharded_collection
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index 54da2e53fdd..e1dc1e9a6fa 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -27,6 +27,8 @@ selector:
- assumes_read_preference_unchanged
# Transactions not supported on sharded cluster.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index a737ec614bf..7a7cdd7320f 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -15,6 +15,9 @@ selector:
- assumes_write_concern_unchanged
# Transactions not supported on sharded clusters.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
+ - assumes_unsharded_collection
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index 3f702c1bb5c..968d2cd6b13 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -16,6 +16,8 @@ selector:
- uses_transactions
# Not relevant for whole-cluster change streams.
- do_not_run_in_whole_cluster_passthrough
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index cb6c8263015..368b1d4d538 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -29,6 +29,8 @@ selector:
- uses_transactions
# Not relevant for whole-cluster change streams.
- do_not_run_in_whole_cluster_passthrough
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
index 838a57c1e3e..4d38135bc0f 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -16,6 +16,9 @@ selector:
- uses_transactions
# Not relevant for whole-cluster change streams.
- do_not_run_in_whole_cluster_passthrough
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
+ - assumes_unsharded_collection
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index 9c4d11901da..d9cd88e7a98 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -17,6 +17,8 @@ selector:
- assumes_write_concern_unchanged
# Transactions not supported on sharded clusters.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index 1b6fde42245..88f63128bab 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -29,6 +29,8 @@ selector:
- assumes_read_preference_unchanged
# Transactions not supported on sharded cluster.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
index 2b3b5ed4064..a861822f3b2 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
@@ -17,6 +17,9 @@ selector:
- assumes_write_concern_unchanged
# Transactions not supported on sharded clusters.
- uses_transactions
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
+ - assumes_unsharded_collection
executor:
archive:
diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js
new file mode 100644
index 00000000000..905ec6f1227
--- /dev/null
+++ b/jstests/change_streams/lookup_pre_image.js
@@ -0,0 +1,137 @@
+/**
+ * Tests the behaviour of the 'fullDocumentBeforeChange' argument to the $changeStream stage.
+ *
+ * @tags: [
+ * do_not_wrap_aggregations_in_facets,
+ * uses_multiple_connections,
+ * requires_fcv_44,
+ * assumes_against_mongod_not_mongos,
+ * assumes_unsharded_collection
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+const coll = assertDropAndRecreateCollection(db, "change_stream_pre_images");
+const cst = new ChangeStreamTest(db);
+
+// Enable pre-image recording on the test collection.
+assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: true}));
+
+// Open three streams on the collection, one for each "fullDocumentBeforeChange" mode.
+const csNoPreImages = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {"fullDocumentBeforeChange": "off", fullDocument: "updateLookup"}}]
+});
+const csPreImageWhenAvailableCursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [
+ {$changeStream: {"fullDocumentBeforeChange": "whenAvailable", fullDocument: "updateLookup"}}
+ ]
+});
+const csPreImageRequiredCursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline:
+ [{$changeStream: {fullDocumentBeforeChange: "required", fullDocument: "updateLookup"}}]
+});
+
+// Test pre-image lookup for an insertion. No pre-image exists on any cursor.
+assert.commandWorked(coll.insert({_id: "x"}));
+let latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "insert");
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+assert.docEq(latestChange.fullDocument, {_id: "x"});
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor));
+
+// Test pre-image lookup for a replacement operation.
+assert.commandWorked(coll.update({_id: "x"}, {foo: "bar"}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "replace");
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+assert.docEq(latestChange.fullDocument, {_id: "x", foo: "bar"});
+// Add the expected "fullDocumentBeforeChange" and confirm that both pre-image cursors see it.
+latestChange.fullDocumentBeforeChange = {
+ _id: "x"
+};
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor));
+
+// Test pre-image lookup for an op-style update operation.
+assert.commandWorked(coll.update({_id: "x"}, {$set: {foo: "baz"}}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "update");
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+assert.docEq(latestChange.fullDocument, {_id: "x", foo: "baz"});
+assert.docEq(latestChange.updateDescription, {updatedFields: {foo: "baz"}, removedFields: []});
+// Add the expected "fullDocumentBeforeChange" and confirm that both pre-image cursors see it.
+latestChange.fullDocumentBeforeChange = {
+ _id: "x",
+ foo: "bar"
+};
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor));
+
+// Test pre-image lookup for a delete operation.
+assert.commandWorked(coll.remove({_id: "x"}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "delete");
+assert(!latestChange.hasOwnProperty("fullDocument"));
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+// Add the expected "fullDocumentBeforeChange" and confirm that both pre-image cursors see it.
+latestChange.fullDocumentBeforeChange = {
+ _id: "x",
+ foo: "baz"
+};
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor));
+
+// Now disable pre-image generation on the test collection and re-test.
+assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: false}));
+
+// Test pre-image lookup for an insertion. No pre-image exists on any cursor.
+assert.commandWorked(coll.insert({_id: "y"}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "insert");
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+assert.docEq(latestChange.fullDocument, {_id: "y"});
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor));
+
+// Test pre-image lookup for a replacement operation.
+assert.commandWorked(coll.update({_id: "y"}, {foo: "bar"}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "replace");
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+assert.docEq(latestChange.fullDocument, {_id: "y", foo: "bar"});
+// The "whenAvailable" cursor retrieves a document without the pre-image...
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+// ... but the "required" cursor throws an exception.
+const csErr = assert.throws(() => cst.getOneChange(csPreImageRequiredCursor));
+assert.eq(csErr.code, 51770);
+
+// Test pre-image lookup for an op-style update operation.
+assert.commandWorked(coll.update({_id: "y"}, {$set: {foo: "baz"}}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "update");
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+assert.docEq(latestChange.fullDocument, {_id: "y", foo: "baz"});
+assert.docEq(latestChange.updateDescription, {updatedFields: {foo: "baz"}, removedFields: []});
+// The "whenAvailable" cursor returns an event without the pre-image.
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+
+// Test pre-image lookup for a delete operation.
+assert.commandWorked(coll.remove({_id: "y"}));
+latestChange = cst.getOneChange(csNoPreImages);
+assert.eq(latestChange.operationType, "delete");
+assert(!latestChange.hasOwnProperty("fullDocument"));
+assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange"));
+// The "whenAvailable" cursor returns an event without the pre-image.
+assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
+
+assertDropCollection(db, coll.getName());
+})();
diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
new file mode 100644
index 00000000000..dc5b9ee9d3f
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
@@ -0,0 +1,109 @@
+/**
+ * Tests that a whole-db or whole-cluster change stream can succeed when the
+ * "fullDocumentBeforeChange" option is set to "required", so long as the user
+ * specifies a pipeline that filters out changes to any collections which do not
+ * have pre-images enabled.
+ *
+ * @tags: [uses_change_streams, requires_replication]
+ */
+(function() {
+"use strict";
+
+const rst = new ReplSetTest({nodes: 1});
+rst.startSet();
+rst.initiate();
+
+const testDB = rst.getPrimary().getDB(jsTestName());
+const adminDB = rst.getPrimary().getDB("admin");
+
+// Create one collection that has pre-image recording enabled...
+const collWithPreImages = testDB.coll_with_pre_images;
+assert.commandWorked(testDB.createCollection(collWithPreImages.getName(), {recordPreImages: true}));
+
+//... and one collection which has pre-images disabled.
+const collWithNoPreImages = testDB.coll_with_no_pre_images;
+assert.commandWorked(
+ testDB.createCollection(collWithNoPreImages.getName(), {recordPreImages: false}));
+
+//... and a collection that will hold the sentinal document that marks the end of changes
+const sentinelColl = testDB.sentinelColl;
+
+// Insert one document as a starting point and extract its resume token.
+const resumeToken = (() => {
+ const csCursor = collWithNoPreImages.watch();
+ assert.commandWorked(collWithNoPreImages.insert({_id: -1}));
+ assert.soon(() => csCursor.hasNext());
+ return csCursor.next()._id;
+})();
+
+// Write a series of interleaving operations to each collection.
+assert.commandWorked(collWithNoPreImages.insert({_id: 0}));
+assert.commandWorked(collWithPreImages.insert({_id: 0}));
+
+assert.commandWorked(collWithNoPreImages.update({_id: 0}, {foo: "bar"}));
+assert.commandWorked(collWithPreImages.update({_id: 0}, {foo: "bar"}));
+
+assert.commandWorked(collWithNoPreImages.update({_id: 0}, {$set: {foo: "baz"}}));
+assert.commandWorked(collWithPreImages.update({_id: 0}, {$set: {foo: "baz"}}));
+
+assert.commandWorked(collWithNoPreImages.remove({_id: 0}));
+assert.commandWorked(collWithPreImages.remove({_id: 0}));
+
+// This will generate an insert change event we can wait for on the change stream that indicates
+// we have reached the end of changes this test is interested in.
+assert.commandWorked(sentinelColl.insert({_id: "last_change_sentinel"}));
+
+// Confirm that attempting to open a whole-db stream on this database with mode "required" fails.
+const csWholeDBErr = assert.throws(function() {
+ const wholeDBStream =
+ testDB.watch([], {fullDocumentBeforeChange: "required", resumeAfter: resumeToken});
+
+ return assert.soon(() => wholeDBStream.hasNext() &&
+ wholeDBStream.next().documentKey._id === "last_change_sentinel");
+});
+assert.eq(csWholeDBErr.code, 51770);
+
+// Confirm that attempting to open a whole-cluster stream on with mode "required" fails.
+const csWholeClusterErr = assert.throws(function() {
+ const wholeClusterStream = adminDB.watch([], {
+ fullDocumentBeforeChange: "required",
+ resumeAfter: resumeToken,
+ allChangesForCluster: true,
+ });
+
+ return assert.soon(() => wholeClusterStream.hasNext() &&
+ wholeClusterStream.next().documentKey._id == "last_change_sentinel");
+});
+assert.eq(csWholeClusterErr.code, 51770);
+
+// However, if we open a whole-db or whole-cluster stream that filters for only the namespace with
+// pre-images, then the cursor can proceed. This is because the $match gets moved ahead of the
+// pre-image lookup stage, so no events from 'collWithNoPreImages' ever reach it, and therefore
+// don't trip the validation checks for the existence of the pre-image.
+for (let runOnDB of [testDB, adminDB]) {
+ // Open a whole-db or whole-cluster stream that filters for the 'collWithPreImages' namespace.
+ const csCursor = runOnDB.watch([{$match: {"ns.coll": collWithPreImages.getName()}}], {
+ fullDocumentBeforeChange: "required",
+ resumeAfter: resumeToken,
+ allChangesForCluster: (runOnDB === adminDB)
+ });
+
+ // The list of events and pre-images that we expect to see in the stream.
+ const expectedPreImageEvents = [
+ {opType: "insert", fullDocumentBeforeChange: null},
+ {opType: "replace", fullDocumentBeforeChange: {_id: 0}},
+ {opType: "update", fullDocumentBeforeChange: {_id: 0, foo: "bar"}},
+ {opType: "delete", fullDocumentBeforeChange: {_id: 0, foo: "baz"}}
+ ];
+
+ // Confirm that the expected events are all seen, and in the expected order.
+ for (let expectedEvent of expectedPreImageEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const observedEvent = csCursor.next();
+ assert.eq(observedEvent.operationType, expectedEvent.opType);
+ assert.eq(observedEvent.fullDocumentBeforeChange, expectedEvent.fullDocumentBeforeChange);
+ }
+}
+
+rst.stopSet();
+})();
diff --git a/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js b/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js
new file mode 100644
index 00000000000..8f3738f4700
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js
@@ -0,0 +1,38 @@
+/**
+ * Test that mongoS rejects change streams which request 'fullDocumentBeforeChange' pre-images.
+ *
+ * @tags: [uses_change_streams, requires_replication]
+ */
+(function() {
+'use strict';
+
+const st = new ShardingTest({
+ shards: 1,
+ mongos: 1,
+ config: 1,
+});
+
+const shard = st.shard0;
+const mongos = st.s;
+
+// Test that we cannot create a collection with pre-images enabled in a sharded cluster.
+assert.commandFailed(shard.getDB("test").runCommand({create: "test", recordPreImages: true}));
+
+// Test that attempting to run $changeStream with {fullDocumentBeforeChange: "whenAvailable"} fails.
+assert.commandFailedWithCode(mongos.getDB("test").runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {fullDocumentBeforeChange: "whenAvailable"}}],
+ cursor: {}
+}),
+ 51771);
+
+// Test that attempting to run $changeStream with {fullDocumentBeforeChange: "required"} fails.
+assert.commandFailedWithCode(mongos.getDB("test").runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {fullDocumentBeforeChange: "required"}}],
+ cursor: {}
+}),
+ 51771);
+
+st.stop();
+}());
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 45dceed8cf2..a2f6fdd8544 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -230,6 +230,7 @@ pipelineEnv.Library(
'document_source_list_sessions.cpp',
'document_source_lookup.cpp',
'document_source_lookup_change_post_image.cpp',
+ 'document_source_lookup_change_pre_image.cpp',
'document_source_match.cpp',
'document_source_merge.cpp',
'document_source_out.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 153bc685a70..e1b5597bd43 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
@@ -72,6 +73,7 @@ REGISTER_MULTI_STAGE_ALIAS(changeStream,
DocumentSourceChangeStream::createFromBson);
constexpr StringData DocumentSourceChangeStream::kDocumentKeyField;
+constexpr StringData DocumentSourceChangeStream::kFullDocumentBeforeChangeField;
constexpr StringData DocumentSourceChangeStream::kFullDocumentField;
constexpr StringData DocumentSourceChangeStream::kIdField;
constexpr StringData DocumentSourceChangeStream::kNamespaceField;
@@ -470,19 +472,38 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
auto fullDocOption = spec.getFullDocument();
uassert(40575,
str::stream() << "unrecognized value for the 'fullDocument' option to the "
- "$changeStream stage. Expected \"default\" or "
- "\"updateLookup\", got \""
- << fullDocOption << "\"",
+ "$changeStream stage. Expected 'default' or 'updateLookup', got '"
+ << fullDocOption << "'",
fullDocOption == "updateLookup"_sd || fullDocOption == "default"_sd);
const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd);
+ const bool shouldLookupPreImage =
+ (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
+
+ // TODO SERVER-36941: We do not currently support sharded pre-image lookup.
+ uassert(51771,
+ "the 'fullDocumentBeforeChange' option is not supported in a sharded cluster",
+ !(shouldLookupPreImage && (expCtx->inMongos || expCtx->needsMerge)));
+
auto stages = buildPipeline(expCtx, spec, elem);
+
if (!expCtx->needsMerge) {
// There should only be one close cursor stage. If we're on the shards and producing input
// to be merged, do not add a close cursor stage, since the mongos will already have one.
stages.push_back(DocumentSourceCloseCursor::create(expCtx));
+ // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here
+ // so that any $match stages which follow the $changeStream pipeline prefix may be able to
+ // skip ahead of the DSLPreImage stage. This allows a whole-db or whole-cluster stream to
+ // run on an instance where only some collections have pre-images enabled, so long as the
+ // user filters for only those namespaces.
+ // TODO SERVER-36941: figure out how to get this to work in a sharded cluster.
+ if (shouldLookupPreImage) {
+ invariant(!expCtx->inMongos);
+ stages.push_back(DocumentSourceLookupChangePreImage::create(expCtx, spec));
+ }
+
// There should be only one post-image lookup stage. If we're on the shards and producing
// input to be merged, the lookup is done on the mongos.
if (shouldLookupPostImage) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 9570f2e16e1..1b538cbd256 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -102,6 +102,9 @@ public:
// after the transformation.
static constexpr StringData kDocumentKeyField = "documentKey"_sd;
+ // The name of the field where the pre-image document will be found, if requested and available.
+ static constexpr StringData kFullDocumentBeforeChangeField = "fullDocumentBeforeChange"_sd;
+
// The name of the field where the full document will be found after the transformation. The
// full document is only present for certain types of operations, such as an insert.
static constexpr StringData kFullDocumentField = "fullDocument"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index 410e5ab9f15..485a42cef9e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -51,6 +51,19 @@ types:
serializer: ResumeToken::toBSON_do_not_use
deserializer: ResumeToken::parse
+enums:
+ FullDocumentBeforeChangeMode:
+ description: Possible modes for the 'fullDocumentBeforeChange' parameter of the
+ $changeStream stage.
+ type: string
+ values:
+ # Do not supply pre-images.
+ kOff: "off"
+ # Supply a pre-image if available. Otherwise, omit the output field.
+ kWhenAvailable: "whenAvailable"
+ # Pre-images are required. Throw an exception if not available.
+ kRequired: "required"
+
structs:
ResumeTokenClusterTime:
description: The IDL type of cluster time
@@ -86,12 +99,28 @@ structs:
description: The operation time after which we should start reporting changes.
Only one of resumeAfter, _resumeAfterClusterTimeDeprecated, and
startAtOperationTime should be specified.
+
fullDocument:
cpp_name: fullDocument
type: string
default: '"default"'
description: A string '"updateLookup"' or '"default"', indicating whether or not we
should return a full document or just changes for an update.
+
+ fullDocumentBeforeChange:
+ cpp_name: fullDocumentBeforeChange
+ type: FullDocumentBeforeChangeMode
+ default: kOff
+ description: Valid values are "off", "whenAvailable", or "required". If set to
+ "off", the "fullDocumentBeforeChange" field of the output document
+ is always omitted. If set to "whenAvailable", the
+ "fullDocumentBeforeChange" field will be populated with the
+ pre-image of the document modified by the current change event if
+ such a pre-image is available, and will be omitted otherwise. If
+ set to "required", then the "fullDocumentBeforeChange" field is
+ always populated and an exception is thrown if the pre-image is not
+ available.
+
allChangesForCluster:
cpp_name: allChangesForCluster
type: bool
@@ -99,6 +128,7 @@ structs:
description: A flag indicating whether the stream should report all changes that
occur on the deployment, aside from those on internal databases or
collections.
+
showMigrationEvents:
cpp_name: showMigrationEvents
type: bool
@@ -109,6 +139,7 @@ structs:
deletes may appear that do not reflect actual deletions or insertions
of data. Instead they reflect this data moving from one shard to
another.
+
allowToRunOnConfigDB:
cpp_name: allowToRunOnConfigDB
type: bool
@@ -116,4 +147,4 @@ structs:
description: A flag indicating whether the change stream may be opened on the
'config' database, which is usually banned. This flag is used
internally to allow mongoS to open a stream on 'config.shards', in
- order to monitor for the addition of new shards to the cluster. \ No newline at end of file
+ order to monitor for the addition of new shards to the cluster.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index de0e757ab69..ae5b676769a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -87,6 +87,12 @@ public:
struct MockMongoInterface final : public StubMongoProcessInterface {
+ // Used by operations which need to obtain the oplog's UUID.
+ static const UUID& oplogUuid() {
+ static const UUID* oplog_uuid = new UUID(UUID::gen());
+ return *oplog_uuid;
+ }
+
// This mock iterator simulates a traversal of transaction history in the oplog by returning
// mock oplog entries from a list.
struct MockTransactionHistoryIterator : public TransactionHistoryIteratorBase {
@@ -109,8 +115,11 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
};
MockMongoInterface(std::vector<FieldPath> fields,
- std::vector<repl::OplogEntry> transactionEntries = {})
- : _fields(std::move(fields)), _transactionEntries(std::move(transactionEntries)) {}
+ std::vector<repl::OplogEntry> transactionEntries = {},
+ std::vector<Document> documentsForLookup = {})
+ : _fields(std::move(fields)),
+ _transactionEntries(std::move(transactionEntries)),
+ _documentsForLookup{std::move(documentsForLookup)} {}
// For tests of transactions that involve multiple oplog entries.
std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
@@ -130,6 +139,28 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
return iterator;
}
+ // Called by DocumentSourceLookupPreImage to obtain the UUID of the oplog. Since that's the only
+ // piece of collection info we need for now, just return a BSONObj with the mock oplog UUID.
+ BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) {
+ return BSON("uuid" << oplogUuid());
+ }
+
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern,
+ bool allowSpeculativeMajorityRead) final {
+ Matcher matcher(documentKey.toBson(), expCtx);
+ auto it = std::find_if(_documentsForLookup.begin(),
+ _documentsForLookup.end(),
+ [&](const Document& lookedUpDoc) {
+ return matcher.matches(lookedUpDoc.toBson(), nullptr);
+ });
+ return (it != _documentsForLookup.end() ? *it : boost::optional<Document>{});
+ }
+
// For "insert" tests.
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext*, const NamespaceString&, UUID) const final {
@@ -148,6 +179,9 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
// These entries are stored in the order they would be returned by the
// TransactionHistoryIterator, which is the _reverse_ of the order they appear in the oplog.
std::vector<repl::OplogEntry> _transactionEntries;
+
+ // These documents are used to feed the 'lookupSingleDocument' method.
+ std::vector<Document> _documentsForLookup;
};
class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup {
@@ -169,12 +203,13 @@ public:
std::vector<FieldPath> docKeyFields = {},
const BSONObj& spec = kDefaultSpec,
const boost::optional<Document> expectedInvalidate = {},
- const std::vector<repl::OplogEntry> transactionEntries = {}) {
+ const std::vector<repl::OplogEntry> transactionEntries = {},
+ std::vector<Document> documentsForLookup = {}) {
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec);
auto closeCursor = stages.back();
- getExpCtx()->mongoProcessInterface =
- std::make_unique<MockMongoInterface>(docKeyFields, transactionEntries);
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
+ docKeyFields, transactionEntries, std::move(documentsForLookup));
auto next = closeCursor->getNext();
// Match stage should pass the doc down if expectedDoc is given.
@@ -321,19 +356,20 @@ public:
return lsid;
}
-
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
*/
- static repl::OplogEntry makeOplogEntry(repl::OpTypeEnum opType,
- NamespaceString nss,
- BSONObj object,
- boost::optional<UUID> uuid = testUuid(),
- boost::optional<bool> fromMigrate = boost::none,
- boost::optional<BSONObj> object2 = boost::none,
- boost::optional<repl::OpTime> opTime = boost::none,
- OperationSessionInfo sessionInfo = {},
- boost::optional<repl::OpTime> prevOpTime = {}) {
+ static repl::OplogEntry makeOplogEntry(
+ repl::OpTypeEnum opType,
+ NamespaceString nss,
+ BSONObj object,
+ boost::optional<UUID> uuid = testUuid(),
+ boost::optional<bool> fromMigrate = boost::none,
+ boost::optional<BSONObj> object2 = boost::none,
+ boost::optional<repl::OpTime> opTime = boost::none,
+ OperationSessionInfo sessionInfo = {},
+ boost::optional<repl::OpTime> prevOpTime = {},
+ boost::optional<repl::OpTime> preImageOpTime = boost::none) {
long long hash = 1LL;
return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime
hash, // hash
@@ -348,9 +384,9 @@ public:
boost::none, // upsert
Date_t(), // wall clock time
boost::none, // statement id
- prevOpTime, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none); // post-image optime
+ prevOpTime, // optime of previous write within same transaction
+ preImageOpTime, // pre-image optime
+ boost::none); // post-image optime
}
};
@@ -1966,6 +2002,282 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate);
}
+TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
+ // Set the pre-image opTime to 1 second prior to the default event optime.
+ repl::OpTime preImageOpTime{Timestamp(kDefaultTs.getSecs() - 1, 1), 1};
+ const auto preImageObj = BSON("_id" << 1 << "x" << 2);
+
+ // The documentKey for the main change stream event.
+ const auto documentKey = BSON("_id" << 1);
+
+ // The mock oplog UUID used by MockMongoInterface.
+ auto oplogUUID = MockMongoInterface::oplogUuid();
+
+ // Create an oplog entry for the pre-image no-op event.
+ auto preImageEntry = makeOplogEntry(OpTypeEnum::kNoop,
+ NamespaceString::kRsOplogNamespace,
+ preImageObj, // o
+ oplogUUID, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ preImageOpTime // opTime
+ );
+
+ // Create an oplog entry for the delete event that will look up the pre-image.
+ auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete,
+ nss,
+ documentKey, // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ kDefaultOpTime, // opTime
+ {}, // sessionInfo
+ {}, // prevOpTime
+ preImageOpTime // preImageOpTime
+ );
+
+ // Add the preImage oplog entry into a vector of documents that will be looked up. Add a dummy
+ // entry before it so that we know we are finding the pre-image based on the given timestamp.
+ repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm};
+ std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()},
+ Document{preImageEntry.toBSON()}};
+
+ // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available.
+ auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "off"));
+ Document expectedDeleteNoPreImage{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, documentKey},
+ };
+ checkTransformation(
+ deleteEntry, expectedDeleteNoPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "whenAvailable"));
+ Document expectedDeleteWithPreImage{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, documentKey},
+ };
+ checkTransformation(
+ deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ checkTransformation(
+ deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image, we see the event
+ // without the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "whenAvailable"));
+ checkTransformation(deleteEntry, expectedDeleteNoPreImage, {}, spec);
+
+ // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the
+ // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, {}, spec),
+ AssertionException,
+ ErrorCodes::ChangeStreamHistoryLost);
+}
+
+TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
+ // Set the pre-image opTime to 1 second prior to the default event optime.
+ repl::OpTime preImageOpTime{Timestamp(kDefaultTs.getSecs() - 1, 1), 1};
+
+ // Define the pre-image object, the update operation spec, and the document key.
+ const auto updateSpec = BSON("$unset" << BSON("x" << 1));
+ const auto preImageObj = BSON("_id" << 1 << "x" << 2);
+ const auto documentKey = BSON("_id" << 1);
+
+ // The mock oplog UUID used by MockMongoInterface.
+ auto oplogUUID = MockMongoInterface::oplogUuid();
+
+ // Create an oplog entry for the pre-image no-op event.
+ auto preImageEntry = makeOplogEntry(OpTypeEnum::kNoop,
+ NamespaceString::kRsOplogNamespace,
+ preImageObj, // o
+ oplogUUID, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ preImageOpTime // opTime
+ );
+
+ // Create an oplog entry for the update event that will look up the pre-image.
+ auto updateEntry = makeOplogEntry(OpTypeEnum::kUpdate,
+ nss,
+ updateSpec, // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ documentKey, // o2
+ kDefaultOpTime, // opTime
+ {}, // sessionInfo
+ {}, // prevOpTime
+ preImageOpTime // preImageOpTime
+ );
+
+ // Add the preImage oplog entry into a vector of documents that will be looked up. Add a dummy
+ // entry before it so that we know we are finding the pre-image based on the given timestamp.
+ repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm};
+ std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()},
+ Document{preImageEntry.toBSON()}};
+
+ // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available.
+ auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "off"));
+ Document expectedUpdateNoPreImage{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, documentKey},
+ {
+ "updateDescription",
+ D{{"updatedFields", D{}}, {"removedFields", vector<V>{V("x"_sd)}}},
+ },
+ };
+ checkTransformation(
+ updateEntry, expectedUpdateNoPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "whenAvailable"));
+ Document expectedUpdateWithPreImage{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, documentKey},
+ {
+ "updateDescription",
+ D{{"updatedFields", D{}}, {"removedFields", vector<V>{V("x"_sd)}}},
+ },
+ };
+ checkTransformation(
+ updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ checkTransformation(
+ updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image, we see the event
+ // without the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "whenAvailable"));
+ checkTransformation(updateEntry, expectedUpdateNoPreImage, {}, spec);
+
+ // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the
+ // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, {}, spec),
+ AssertionException,
+ ErrorCodes::ChangeStreamHistoryLost);
+}
+
+TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
+ // Set the pre-image opTime to 1 second prior to the default event optime.
+ repl::OpTime preImageOpTime{Timestamp(kDefaultTs.getSecs() - 1, 1), 1};
+
+ // Define the pre-image object, the replacement document, and the document key.
+ const auto replacementDoc = BSON("_id" << 1 << "y" << 3);
+ const auto preImageObj = BSON("_id" << 1 << "x" << 2);
+ const auto documentKey = BSON("_id" << 1);
+
+ // The mock oplog UUID used by MockMongoInterface.
+ auto oplogUUID = MockMongoInterface::oplogUuid();
+
+ // Create an oplog entry for the pre-image no-op event.
+ auto preImageEntry = makeOplogEntry(OpTypeEnum::kNoop,
+ NamespaceString::kRsOplogNamespace,
+ preImageObj, // o
+ oplogUUID, // uuid
+ boost::none, // fromMigrate
+ boost::none, // o2
+ preImageOpTime // opTime
+ );
+
+ // Create an oplog entry for the replacement event that will look up the pre-image.
+ auto replaceEntry = makeOplogEntry(OpTypeEnum::kUpdate,
+ nss,
+ replacementDoc, // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ documentKey, // o2
+ kDefaultOpTime, // opTime
+ {}, // sessionInfo
+ {}, // prevOpTime
+ preImageOpTime // preImageOpTime
+ );
+
+ // Add the preImage oplog entry into a vector of documents that will be looked up. Add a dummy
+ // entry before it so that we know we are finding the pre-image based on the given timestamp.
+ repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm};
+ std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()},
+ Document{preImageEntry.toBSON()}};
+
+ // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available.
+ auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "off"));
+ Document expectedReplaceNoPreImage{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kFullDocumentField, replacementDoc},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, documentKey},
+ };
+ checkTransformation(
+ replaceEntry, expectedReplaceNoPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "whenAvailable"));
+ Document expectedReplaceWithPreImage{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kFullDocumentField, replacementDoc},
+ {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, documentKey},
+ };
+ checkTransformation(
+ replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ checkTransformation(
+ replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+
+ // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image, we see the event
+ // without the pre-image.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "whenAvailable"));
+ checkTransformation(replaceEntry, expectedReplaceNoPreImage, {}, spec);
+
+ // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the
+ // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost.
+ spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, {}, spec),
+ AssertionException,
+ ErrorCodes::ChangeStreamHistoryLost);
+}
+
TEST_F(ChangeStreamStageDBTest, MatchFiltersOperationsOnSystemCollections) {
NamespaceString systemColl(nss.db() + ".system.users");
OplogEntry insert = makeOplogEntry(OpTypeEnum::kInsert, systemColl, BSON("_id" << 1));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 20ceab62bd3..5c875c06741 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -92,6 +92,10 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
_changeStreamSpec);
+ // If the change stream spec requested a pre-image, make sure that we supply one.
+ _includePreImageOptime =
+ (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
+
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
auto resumeAfter = spec.getResumeAfter();
@@ -188,6 +192,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
Value ns = input[repl::OplogEntry::kNssFieldName];
checkValueType(ns, repl::OplogEntry::kNssFieldName, BSONType::String);
Value uuid = input[repl::OplogEntry::kUuidFieldName];
+ Value preImageOpTime = input[repl::OplogEntry::kPreImageOpTimeFieldName];
std::vector<FieldPath> documentKeyFields;
// Deal with CRUD operations and commands.
@@ -338,7 +343,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
return doc.freeze();
}
+ // Add the post-image, pre-image, namespace, documentKey and other fields as appropriate.
doc.addField(DocumentSourceChangeStream::kFullDocumentField, fullDocument);
+ if (_includePreImageOptime) {
+ // Set 'kFullDocumentBeforeChangeField' to the pre-image optime. The DSCSLookupPreImage
+ // stage will replace this optime with the actual pre-image taken from the oplog.
+ doc.addField(DocumentSourceChangeStream::kFullDocumentBeforeChangeField, preImageOpTime);
+ }
doc.addField(DocumentSourceChangeStream::kNamespaceField,
operationType == DocumentSourceChangeStream::kDropDatabaseOpType
? Value(Document{{"db", nss.db()}})
@@ -384,6 +395,9 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac
deps->fields.insert(repl::OplogEntry::kSessionIdFieldName.toString());
deps->fields.insert(repl::OplogEntry::kTermFieldName.toString());
deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString());
+ if (_includePreImageOptime) {
+ deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString());
+ }
return DepsTracker::State::EXHAUSTIVE_ALL;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index df8cecf49f0..693ddd6fa66 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -216,6 +216,9 @@ private:
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
+ // Set to true if the pre-image optime should be included in output documents.
+ bool _includePreImageOptime = false;
+
// '_fcv' is used to determine which version of the resume token to generate for each change.
// This is a snapshot of what the feature compatibility version was at the time the stream was
// opened or resumed.
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp
new file mode 100644
index 00000000000..af670df9961
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
+
+#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/db/repl/local_oplog_info.h"
+#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/util/intrusive_counter.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+constexpr StringData DocumentSourceLookupChangePreImage::kStageName;
+constexpr StringData DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName;
+
+boost::intrusive_ptr<DocumentSourceLookupChangePreImage> DocumentSourceLookupChangePreImage::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec) {
+ auto mode = spec.getFullDocumentBeforeChange();
+
+ return make_intrusive<DocumentSourceLookupChangePreImage>(expCtx, mode);
+}
+
+DocumentSource::GetNextResult DocumentSourceLookupChangePreImage::doGetNext() {
+ auto input = pSource->getNext();
+ if (!input.isAdvanced()) {
+ return input;
+ }
+
+ // If this is not an update, replace or delete, then just pass along the result.
+ const auto kOpTypeField = DocumentSourceChangeStream::kOperationTypeField;
+ const auto opType = input.getDocument()[kOpTypeField];
+ DocumentSourceChangeStream::checkValueType(opType, kOpTypeField, BSONType::String);
+ if (opType.getStringData() != DocumentSourceChangeStream::kUpdateOpType &&
+ opType.getStringData() != DocumentSourceChangeStream::kReplaceOpType &&
+ opType.getStringData() != DocumentSourceChangeStream::kDeleteOpType) {
+ return input;
+ }
+
+ // If a pre-image is available, the transform stage will have populated it in the event's
+ // 'fullDocumentBeforeChange' field. If this field is missing and the pre-imaging mode is
+ // 'required', we throw an exception. Otherwise, we pass along the document unmodified.
+ auto preImageOpTimeVal = input.getDocument()[kFullDocumentBeforeChangeFieldName];
+ if (preImageOpTimeVal.missing()) {
+ uassert(51770,
+ str::stream()
+ << "Change stream was configured to require a pre-image for all update, delete "
+ "and replace events, but no pre-image optime was recorded for event: "
+ << input.getDocument().toString(),
+ _fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kRequired);
+ return input;
+ }
+
+ // Look up the pre-image using the optime. This may return boost::none if it was not found.
+ auto preImageOpTime = repl::OpTime::parse(preImageOpTimeVal.getDocument().toBson());
+ auto preImageDoc = lookupPreImage(input.getDocument(), preImageOpTime);
+
+ // Even if no pre-image was found, we have to replace the 'fullDocumentBeforeChange' field.
+ MutableDocument outputDoc(input.releaseDocument());
+ outputDoc[kFullDocumentBeforeChangeFieldName] = (preImageDoc ? Value(*preImageDoc) : Value());
+
+ return outputDoc.freeze();
+}
+
+boost::optional<Document> DocumentSourceLookupChangePreImage::lookupPreImage(
+ const Document& inputDoc, const repl::OpTime& opTime) const {
+ // We need the oplog's UUID for lookup, so obtain the collection info via MongoProcessInterface.
+ auto localOplogInfo = pExpCtx->mongoProcessInterface->getCollectionOptions(
+ pExpCtx->opCtx, NamespaceString::kRsOplogNamespace);
+
+ // Extract the UUID from the collection information. We should always have a valid uuid here.
+ auto oplogUUID = invariantStatusOK(UUID::parse(localOplogInfo["uuid"]));
+
+ // Look up the pre-image oplog entry using the opTime as the query filter.
+ auto lookedUpDoc =
+ pExpCtx->mongoProcessInterface->lookupSingleDocument(pExpCtx,
+ NamespaceString::kRsOplogNamespace,
+ oplogUUID,
+ Document{opTime.asQuery()},
+ boost::none);
+
+ // Failing to find an oplog entry implies that the pre-image has rolled off the oplog. This is
+ // acceptable if the mode is "kWhenAvailable", but not if the mode is "kRequired".
+ if (!lookedUpDoc) {
+ uassert(
+ ErrorCodes::ChangeStreamHistoryLost,
+ str::stream()
+ << "Change stream was configured to require a pre-image for all update, delete and "
+ "replace events, but the pre-image was not found in the oplog for event: "
+ << inputDoc.toString(),
+ _fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kRequired);
+
+ // Return boost::none to signify that we (legally) failed to find the pre-image.
+ return boost::none;
+ }
+
+ // If we had an optime to look up, and we found an oplog entry with that timestamp, then we
+ // should always have a valid no-op entry containing a valid, non-empty pre-image document.
+ auto opLogEntry = invariantStatusOK(repl::OplogEntry::parse(lookedUpDoc->toBson()));
+ invariant(opLogEntry.getOpType() == repl::OpTypeEnum::kNoop);
+ invariant(!opLogEntry.getObject().isEmpty());
+
+ return Document{opLogEntry.getObject().getOwned()};
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h
new file mode 100644
index 00000000000..eea42b816fb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h
@@ -0,0 +1,130 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
+
+namespace mongo {
+
+/**
+ * Part of the change stream API machinery used to look up the pre-image of a document.
+ *
+ * After a document that should have its pre-image included is transformed from the oplog,
+ * its "fullDocumentBeforeChange" field shall be the optime of the noop oplog entry containing the
+ * pre-image. This stage replaces that field with the actual pre-image document.
+ */
+class DocumentSourceLookupChangePreImage final : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalLookupChangePreImage"_sd;
+ static constexpr StringData kFullDocumentBeforeChangeFieldName =
+ DocumentSourceChangeStream::kFullDocumentBeforeChangeField;
+
+ /**
+ * Creates a DocumentSourceLookupChangePostImage stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceLookupChangePreImage> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
+
+ DocumentSourceLookupChangePreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ FullDocumentBeforeChangeModeEnum mode)
+ : DocumentSource(kStageName, expCtx), _fullDocumentBeforeChangeMode(mode) {
+ // This stage should never be created with FullDocumentBeforeChangeMode::kOff.
+ invariant(_fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kOff);
+ }
+
+ /**
+ * Only modifies a single path: "fullDocumentBeforeChange".
+ */
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet,
+ {kFullDocumentBeforeChangeFieldName.toString()},
+ {}};
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ invariant(pipeState != Pipeline::SplitState::kSplitForShards);
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ LookupRequirement::kNotAllowed,
+ UnionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage);
+ constraints.canSwapWithMatch = true;
+ return constraints;
+ }
+
+ // Conceptually, this stage must always run on the shards part of the pipeline. At
+ // present, however, pre-image retrieval is not supported in a sharded cluster, and
+ // so the distributed plan logic will not be used.
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ return DistributedPlanLogic{this, nullptr, boost::none};
+ }
+
+ DepsTracker::State getDependencies(DepsTracker* deps) const {
+ deps->fields.insert(DocumentSourceChangeStream::kFullDocumentBeforeChangeField.toString());
+ // This stage does not restrict the output fields to a finite set, and has no impact on
+ // whether metadata is available or needed.
+ return DepsTracker::State::SEE_NEXT;
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ return (explain ? Value{Document{{kStageName, Document()}}} : Value());
+ }
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+private:
+ /**
+ * Performs the lookup to retrieve the full pre-image document for applicable operations.
+ */
+ GetNextResult doGetNext() final;
+
+ /**
+ * Looks up and returns a pre-image document at the specified opTime in the oplog. Returns
+ * boost::none if the mode is "kWhenAvailable" and no such oplog entry was found. Throws if the
+ * pre-image mode is "kRequired" and no entry was found. Invariants that if an oplog entry with
+ * the given opTime is found, it is a no-op entry with a valid non-empty pre-image document.
+ */
+ boost::optional<Document> lookupPreImage(const Document& inputDoc,
+ const repl::OpTime& opTime) const;
+
+ // Determines whether pre-images are strictly required or may be included only when available.
+ FullDocumentBeforeChangeModeEnum _fullDocumentBeforeChangeMode =
+ FullDocumentBeforeChangeModeEnum::kOff;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 5cc39e890bf..39276c064d4 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/pipeline/document_source_internal_split_pipeline.h"
#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_source_out.h"
@@ -1708,6 +1709,58 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline->getSources().back().get()));
}
+TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependentMatch) {
+ QueryTestServiceContext testServiceContext;
+ auto opCtx = testServiceContext.makeOperationContext();
+
+ intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
+ expCtx->opCtx = opCtx.get();
+ expCtx->uuid = UUID::gen();
+ setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
+
+ auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
+ ASSERT_EQ(stages.size(), 5UL);
+ // Make sure the pre-image lookup is at the end.
+ ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get()));
+
+ auto matchPredicate = BSON("extra"
+ << "predicate");
+ stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx));
+ auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ pipeline->optimizePipeline();
+
+ // Make sure the $match stage has swapped before the change look up.
+ ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(pipeline->getSources().back().get()));
+}
+
+TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPreImage) {
+ QueryTestServiceContext testServiceContext;
+ auto opCtx = testServiceContext.makeOperationContext();
+
+ intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
+ expCtx->opCtx = opCtx.get();
+ expCtx->uuid = UUID::gen();
+ setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
+
+ auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
+ << "required"));
+ auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
+ ASSERT_EQ(stages.size(), 5UL);
+ // Make sure the pre-image lookup is at the end.
+ ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get()));
+
+ stages.push_back(DocumentSourceMatch::create(
+ BSON(DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName << BSONNULL),
+ expCtx));
+ auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ pipeline->optimizePipeline();
+
+ // Make sure the $match stage stays at the end.
+ ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline->getSources().back().get()));
+}
+
TEST(PipelineOptimizationTest, SortLimProjLimBecomesTopKSortProj) {
std::string inputPipe =
"[{$sort: {a: 1}}"
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index 481aff7c6ad..2503a989d74 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -633,6 +633,16 @@ Mongo.prototype._extractChangeStreamOptions = function(options) {
delete options.startAtOperationTime;
}
+ if (options.hasOwnProperty("fullDocumentBeforeChange")) {
+ changeStreamOptions.fullDocumentBeforeChange = options.fullDocumentBeforeChange;
+ delete options.fullDocumentBeforeChange;
+ }
+
+ if (options.hasOwnProperty("allChangesForCluster")) {
+ changeStreamOptions.allChangesForCluster = options.allChangesForCluster;
+ delete options.allChangesForCluster;
+ }
+
return [{$changeStream: changeStreamOptions}, options];
};