diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2020-02-10 10:16:59 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-10 13:08:24 +0000 |
commit | 1a8fea4b5a2fe7ec14d88d1ce4e7c8a1eaaa03cd (patch) | |
tree | 2c15f4dc5129ffa398ff4f07938754b62bbfb8a8 | |
parent | 6c9c0b19d5980f065e1ff2ad624bb8d18bb88fe5 (diff) | |
download | mongo-1a8fea4b5a2fe7ec14d88d1ce4e7c8a1eaaa03cd.tar.gz |
SERVER-45807 Add change stream stage to fetch pre-image for update/replace/delete events
create mode 100644 jstests/change_streams/lookup_pre_image.js
create mode 100644 jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
create mode 100644 jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js
create mode 100644 src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp
create mode 100644 src/mongo/db/pipeline/document_source_lookup_change_pre_image.h
26 files changed, 1049 insertions, 22 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index f3e3a089fbc..62e5d1a2e1c 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -14,6 +14,8 @@ selector: - assumes_write_concern_unchanged # Transactions not supported on sharded clusters. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index dbf3a7a093f..05357a578b5 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -14,6 +14,8 @@ selector: - assumes_write_concern_unchanged # Transactions not supported on sharded clusters. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml index 88ce52e3d1b..56f91e843c0 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml @@ -15,6 +15,8 @@ selector: # These tests make assumptions about change stream results that are no longer true once operations # get bundled into transactions. - change_stream_does_not_expect_txns + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml index 2b5f6f4e5a8..d4b7fee5122 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml @@ -15,6 +15,9 @@ selector: # These tests make assumptions about change stream results that are no longer true once operations # get bundled into transactions. - change_stream_does_not_expect_txns + # Exclude any tests that on't support sharding + - assumes_against_mongod_not_mongos, + - assumes_unsharded_collection executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index 54da2e53fdd..e1dc1e9a6fa 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -27,6 +27,8 @@ selector: - assumes_read_preference_unchanged # Transactions not supported on sharded cluster. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index a737ec614bf..7a7cdd7320f 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -15,6 +15,9 @@ selector: - assumes_write_concern_unchanged # Transactions not supported on sharded clusters. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos + - assumes_unsharded_collection executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index 3f702c1bb5c..968d2cd6b13 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -16,6 +16,8 @@ selector: - uses_transactions # Not relevant for whole-cluster change streams. - do_not_run_in_whole_cluster_passthrough + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index cb6c8263015..368b1d4d538 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -29,6 +29,8 @@ selector: - uses_transactions # Not relevant for whole-cluster change streams. - do_not_run_in_whole_cluster_passthrough + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml index 838a57c1e3e..4d38135bc0f 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml @@ -16,6 +16,9 @@ selector: - uses_transactions # Not relevant for whole-cluster change streams. - do_not_run_in_whole_cluster_passthrough + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos + - assumes_unsharded_collection executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index 9c4d11901da..d9cd88e7a98 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -17,6 +17,8 @@ selector: - assumes_write_concern_unchanged # Transactions not supported on sharded clusters. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index 1b6fde42245..88f63128bab 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -29,6 +29,8 @@ selector: - assumes_read_preference_unchanged # Transactions not supported on sharded cluster. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos executor: archive: diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml index 2b3b5ed4064..a861822f3b2 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml @@ -17,6 +17,9 @@ selector: - assumes_write_concern_unchanged # Transactions not supported on sharded clusters. - uses_transactions + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos + - assumes_unsharded_collection executor: archive: diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js new file mode 100644 index 00000000000..905ec6f1227 --- /dev/null +++ b/jstests/change_streams/lookup_pre_image.js @@ -0,0 +1,137 @@ +/** + * Tests the behaviour of the 'fullDocumentBeforeChange' argument to the $changeStream stage. + * + * @tags: [ + * do_not_wrap_aggregations_in_facets, + * uses_multiple_connections, + * requires_fcv_44, + * assumes_against_mongod_not_mongos, + * assumes_unsharded_collection + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + +const coll = assertDropAndRecreateCollection(db, "change_stream_pre_images"); +const cst = new ChangeStreamTest(db); + +// Enable pre-image recording on the test collection. +assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: true})); + +// Open three streams on the collection, one for each "fullDocumentBeforeChange" mode. +const csNoPreImages = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: {"fullDocumentBeforeChange": "off", fullDocument: "updateLookup"}}] +}); +const csPreImageWhenAvailableCursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [ + {$changeStream: {"fullDocumentBeforeChange": "whenAvailable", fullDocument: "updateLookup"}} + ] +}); +const csPreImageRequiredCursor = cst.startWatchingChanges({ + collection: coll, + pipeline: + [{$changeStream: {fullDocumentBeforeChange: "required", fullDocument: "updateLookup"}}] +}); + +// Test pre-image lookup for an insertion. No pre-image exists on any cursor. +assert.commandWorked(coll.insert({_id: "x"})); +let latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "insert"); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +assert.docEq(latestChange.fullDocument, {_id: "x"}); +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); +assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor)); + +// Test pre-image lookup for a replacement operation. +assert.commandWorked(coll.update({_id: "x"}, {foo: "bar"})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "replace"); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +assert.docEq(latestChange.fullDocument, {_id: "x", foo: "bar"}); +// Add the expected "fullDocumentBeforeChange" and confirm that both pre-image cursors see it. +latestChange.fullDocumentBeforeChange = { + _id: "x" +}; +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); +assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor)); + +// Test pre-image lookup for an op-style update operation. +assert.commandWorked(coll.update({_id: "x"}, {$set: {foo: "baz"}})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "update"); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +assert.docEq(latestChange.fullDocument, {_id: "x", foo: "baz"}); +assert.docEq(latestChange.updateDescription, {updatedFields: {foo: "baz"}, removedFields: []}); +// Add the expected "fullDocumentBeforeChange" and confirm that both pre-image cursors see it. +latestChange.fullDocumentBeforeChange = { + _id: "x", + foo: "bar" +}; +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); +assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor)); + +// Test pre-image lookup for a delete operation. +assert.commandWorked(coll.remove({_id: "x"})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "delete"); +assert(!latestChange.hasOwnProperty("fullDocument")); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +// Add the expected "fullDocumentBeforeChange" and confirm that both pre-image cursors see it. +latestChange.fullDocumentBeforeChange = { + _id: "x", + foo: "baz" +}; +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); +assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor)); + +// Now disable pre-image generation on the test collection and re-test. +assert.commandWorked(db.runCommand({collMod: coll.getName(), recordPreImages: false})); + +// Test pre-image lookup for an insertion. No pre-image exists on any cursor. +assert.commandWorked(coll.insert({_id: "y"})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "insert"); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +assert.docEq(latestChange.fullDocument, {_id: "y"}); +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); +assert.docEq(latestChange, cst.getOneChange(csPreImageRequiredCursor)); + +// Test pre-image lookup for a replacement operation. +assert.commandWorked(coll.update({_id: "y"}, {foo: "bar"})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "replace"); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +assert.docEq(latestChange.fullDocument, {_id: "y", foo: "bar"}); +// The "whenAvailable" cursor retrieves a document without the pre-image... +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); +// ... but the "required" cursor throws an exception. +const csErr = assert.throws(() => cst.getOneChange(csPreImageRequiredCursor)); +assert.eq(csErr.code, 51770); + +// Test pre-image lookup for an op-style update operation. +assert.commandWorked(coll.update({_id: "y"}, {$set: {foo: "baz"}})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "update"); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +assert.docEq(latestChange.fullDocument, {_id: "y", foo: "baz"}); +assert.docEq(latestChange.updateDescription, {updatedFields: {foo: "baz"}, removedFields: []}); +// The "whenAvailable" cursor returns an event without the pre-image. +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); + +// Test pre-image lookup for a delete operation. +assert.commandWorked(coll.remove({_id: "y"})); +latestChange = cst.getOneChange(csNoPreImages); +assert.eq(latestChange.operationType, "delete"); +assert(!latestChange.hasOwnProperty("fullDocument")); +assert(!latestChange.hasOwnProperty("fullDocumentBeforeChange")); +// The "whenAvailable" cursor returns an event without the pre-image. +assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); + +assertDropCollection(db, coll.getName()); +})(); diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js new file mode 100644 index 00000000000..dc5b9ee9d3f --- /dev/null +++ b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js @@ -0,0 +1,109 @@ +/** + * Tests that a whole-db or whole-cluster change stream can succeed when the + * "fullDocumentBeforeChange" option is set to "required", so long as the user + * specifies a pipeline that filters out changes to any collections which do not + * have pre-images enabled. + * + * @tags: [uses_change_streams, requires_replication] + */ +(function() { +"use strict"; + +const rst = new ReplSetTest({nodes: 1}); +rst.startSet(); +rst.initiate(); + +const testDB = rst.getPrimary().getDB(jsTestName()); +const adminDB = rst.getPrimary().getDB("admin"); + +// Create one collection that has pre-image recording enabled... +const collWithPreImages = testDB.coll_with_pre_images; +assert.commandWorked(testDB.createCollection(collWithPreImages.getName(), {recordPreImages: true})); + +//... and one collection which has pre-images disabled. +const collWithNoPreImages = testDB.coll_with_no_pre_images; +assert.commandWorked( + testDB.createCollection(collWithNoPreImages.getName(), {recordPreImages: false})); + +//... and a collection that will hold the sentinal document that marks the end of changes +const sentinelColl = testDB.sentinelColl; + +// Insert one document as a starting point and extract its resume token. +const resumeToken = (() => { + const csCursor = collWithNoPreImages.watch(); + assert.commandWorked(collWithNoPreImages.insert({_id: -1})); + assert.soon(() => csCursor.hasNext()); + return csCursor.next()._id; +})(); + +// Write a series of interleaving operations to each collection. +assert.commandWorked(collWithNoPreImages.insert({_id: 0})); +assert.commandWorked(collWithPreImages.insert({_id: 0})); + +assert.commandWorked(collWithNoPreImages.update({_id: 0}, {foo: "bar"})); +assert.commandWorked(collWithPreImages.update({_id: 0}, {foo: "bar"})); + +assert.commandWorked(collWithNoPreImages.update({_id: 0}, {$set: {foo: "baz"}})); +assert.commandWorked(collWithPreImages.update({_id: 0}, {$set: {foo: "baz"}})); + +assert.commandWorked(collWithNoPreImages.remove({_id: 0})); +assert.commandWorked(collWithPreImages.remove({_id: 0})); + +// This will generate an insert change event we can wait for on the change stream that indicates +// we have reached the end of changes this test is interested in. +assert.commandWorked(sentinelColl.insert({_id: "last_change_sentinel"})); + +// Confirm that attempting to open a whole-db stream on this database with mode "required" fails. +const csWholeDBErr = assert.throws(function() { + const wholeDBStream = + testDB.watch([], {fullDocumentBeforeChange: "required", resumeAfter: resumeToken}); + + return assert.soon(() => wholeDBStream.hasNext() && + wholeDBStream.next().documentKey._id === "last_change_sentinel"); +}); +assert.eq(csWholeDBErr.code, 51770); + +// Confirm that attempting to open a whole-cluster stream on with mode "required" fails. +const csWholeClusterErr = assert.throws(function() { + const wholeClusterStream = adminDB.watch([], { + fullDocumentBeforeChange: "required", + resumeAfter: resumeToken, + allChangesForCluster: true, + }); + + return assert.soon(() => wholeClusterStream.hasNext() && + wholeClusterStream.next().documentKey._id == "last_change_sentinel"); +}); +assert.eq(csWholeClusterErr.code, 51770); + +// However, if we open a whole-db or whole-cluster stream that filters for only the namespace with +// pre-images, then the cursor can proceed. This is because the $match gets moved ahead of the +// pre-image lookup stage, so no events from 'collWithNoPreImages' ever reach it, and therefore +// don't trip the validation checks for the existence of the pre-image. +for (let runOnDB of [testDB, adminDB]) { + // Open a whole-db or whole-cluster stream that filters for the 'collWithPreImages' namespace. + const csCursor = runOnDB.watch([{$match: {"ns.coll": collWithPreImages.getName()}}], { + fullDocumentBeforeChange: "required", + resumeAfter: resumeToken, + allChangesForCluster: (runOnDB === adminDB) + }); + + // The list of events and pre-images that we expect to see in the stream. + const expectedPreImageEvents = [ + {opType: "insert", fullDocumentBeforeChange: null}, + {opType: "replace", fullDocumentBeforeChange: {_id: 0}}, + {opType: "update", fullDocumentBeforeChange: {_id: 0, foo: "bar"}}, + {opType: "delete", fullDocumentBeforeChange: {_id: 0, foo: "baz"}} + ]; + + // Confirm that the expected events are all seen, and in the expected order. + for (let expectedEvent of expectedPreImageEvents) { + assert.soon(() => csCursor.hasNext()); + const observedEvent = csCursor.next(); + assert.eq(observedEvent.operationType, expectedEvent.opType); + assert.eq(observedEvent.fullDocumentBeforeChange, expectedEvent.fullDocumentBeforeChange); + } +} + +rst.stopSet(); +})(); diff --git a/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js b/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js new file mode 100644 index 00000000000..8f3738f4700 --- /dev/null +++ b/jstests/noPassthrough/change_stream_preimages_fail_on_mongos.js @@ -0,0 +1,38 @@ +/** + * Test that mongoS rejects change streams which request 'fullDocumentBeforeChange' pre-images. + * + * @tags: [uses_change_streams, requires_replication] + */ +(function() { +'use strict'; + +const st = new ShardingTest({ + shards: 1, + mongos: 1, + config: 1, +}); + +const shard = st.shard0; +const mongos = st.s; + +// Test that we cannot create a collection with pre-images enabled in a sharded cluster. +assert.commandFailed(shard.getDB("test").runCommand({create: "test", recordPreImages: true})); + +// Test that attempting to run $changeStream with {fullDocumentBeforeChange: "whenAvailable"} fails. +assert.commandFailedWithCode(mongos.getDB("test").runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {fullDocumentBeforeChange: "whenAvailable"}}], + cursor: {} +}), + 51771); + +// Test that attempting to run $changeStream with {fullDocumentBeforeChange: "required"} fails. +assert.commandFailedWithCode(mongos.getDB("test").runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {fullDocumentBeforeChange: "required"}}], + cursor: {} +}), + 51771); + +st.stop(); +}()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 45dceed8cf2..a2f6fdd8544 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -230,6 +230,7 @@ pipelineEnv.Library( 'document_source_list_sessions.cpp', 'document_source_lookup.cpp', 'document_source_lookup_change_post_image.cpp', + 'document_source_lookup_change_pre_image.cpp', 'document_source_match.cpp', 'document_source_merge.cpp', 'document_source_out.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 153bc685a70..e1b5597bd43 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" @@ -72,6 +73,7 @@ REGISTER_MULTI_STAGE_ALIAS(changeStream, DocumentSourceChangeStream::createFromBson); constexpr StringData DocumentSourceChangeStream::kDocumentKeyField; +constexpr StringData DocumentSourceChangeStream::kFullDocumentBeforeChangeField; constexpr StringData DocumentSourceChangeStream::kFullDocumentField; constexpr StringData DocumentSourceChangeStream::kIdField; constexpr StringData DocumentSourceChangeStream::kNamespaceField; @@ -470,19 +472,38 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( auto fullDocOption = spec.getFullDocument(); uassert(40575, str::stream() << "unrecognized value for the 'fullDocument' option to the " - "$changeStream stage. Expected \"default\" or " - "\"updateLookup\", got \"" - << fullDocOption << "\"", + "$changeStream stage. Expected 'default' or 'updateLookup', got '" + << fullDocOption << "'", fullDocOption == "updateLookup"_sd || fullDocOption == "default"_sd); const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd); + const bool shouldLookupPreImage = + (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff); + + // TODO SERVER-36941: We do not currently support sharded pre-image lookup. + uassert(51771, + "the 'fullDocumentBeforeChange' option is not supported in a sharded cluster", + !(shouldLookupPreImage && (expCtx->inMongos || expCtx->needsMerge))); + auto stages = buildPipeline(expCtx, spec, elem); + if (!expCtx->needsMerge) { // There should only be one close cursor stage. If we're on the shards and producing input // to be merged, do not add a close cursor stage, since the mongos will already have one. stages.push_back(DocumentSourceCloseCursor::create(expCtx)); + // We only create a pre-image lookup stage on a non-merging mongoD. We place this stage here + // so that any $match stages which follow the $changeStream pipeline prefix may be able to + // skip ahead of the DSLPreImage stage. This allows a whole-db or whole-cluster stream to + // run on an instance where only some collections have pre-images enabled, so long as the + // user filters for only those namespaces. + // TODO SERVER-36941: figure out how to get this to work in a sharded cluster. + if (shouldLookupPreImage) { + invariant(!expCtx->inMongos); + stages.push_back(DocumentSourceLookupChangePreImage::create(expCtx, spec)); + } + // There should be only one post-image lookup stage. If we're on the shards and producing // input to be merged, the lookup is done on the mongos. if (shouldLookupPostImage) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 9570f2e16e1..1b538cbd256 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -102,6 +102,9 @@ public: // after the transformation. static constexpr StringData kDocumentKeyField = "documentKey"_sd; + // The name of the field where the pre-image document will be found, if requested and available. + static constexpr StringData kFullDocumentBeforeChangeField = "fullDocumentBeforeChange"_sd; + // The name of the field where the full document will be found after the transformation. The // full document is only present for certain types of operations, such as an insert. static constexpr StringData kFullDocumentField = "fullDocument"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index 410e5ab9f15..485a42cef9e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -51,6 +51,19 @@ types: serializer: ResumeToken::toBSON_do_not_use deserializer: ResumeToken::parse +enums: + FullDocumentBeforeChangeMode: + description: Possible modes for the 'fullDocumentBeforeChange' parameter of the + $changeStream stage. + type: string + values: + # Do not supply pre-images. + kOff: "off" + # Supply a pre-image if available. Otherwise, omit the output field. + kWhenAvailable: "whenAvailable" + # Pre-images are required. Throw an exception if not available. + kRequired: "required" + structs: ResumeTokenClusterTime: description: The IDL type of cluster time @@ -86,12 +99,28 @@ structs: description: The operation time after which we should start reporting changes. Only one of resumeAfter, _resumeAfterClusterTimeDeprecated, and startAtOperationTime should be specified. + fullDocument: cpp_name: fullDocument type: string default: '"default"' description: A string '"updateLookup"' or '"default"', indicating whether or not we should return a full document or just changes for an update. + + fullDocumentBeforeChange: + cpp_name: fullDocumentBeforeChange + type: FullDocumentBeforeChangeMode + default: kOff + description: Valid values are "off", "whenAvailable", or "required". If set to + "off", the "fullDocumentBeforeChange" field of the output document + is always omitted. If set to "whenAvailable", the + "fullDocumentBeforeChange" field will be populated with the + pre-image of the document modified by the current change event if + such a pre-image is available, and will be omitted otherwise. If + set to "required", then the "fullDocumentBeforeChange" field is + always populated and an exception is thrown if the pre-image is not + available. + allChangesForCluster: cpp_name: allChangesForCluster type: bool @@ -99,6 +128,7 @@ structs: description: A flag indicating whether the stream should report all changes that occur on the deployment, aside from those on internal databases or collections. + showMigrationEvents: cpp_name: showMigrationEvents type: bool @@ -109,6 +139,7 @@ structs: deletes may appear that do not reflect actual deletions or insertions of data. Instead they reflect this data moving from one shard to another. + allowToRunOnConfigDB: cpp_name: allowToRunOnConfigDB type: bool @@ -116,4 +147,4 @@ structs: description: A flag indicating whether the change stream may be opened on the 'config' database, which is usually banned. This flag is used internally to allow mongoS to open a stream on 'config.shards', in - order to monitor for the addition of new shards to the cluster.
\ No newline at end of file + order to monitor for the addition of new shards to the cluster. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index de0e757ab69..ae5b676769a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -87,6 +87,12 @@ public: struct MockMongoInterface final : public StubMongoProcessInterface { + // Used by operations which need to obtain the oplog's UUID. + static const UUID& oplogUuid() { + static const UUID* oplog_uuid = new UUID(UUID::gen()); + return *oplog_uuid; + } + // This mock iterator simulates a traversal of transaction history in the oplog by returning // mock oplog entries from a list. struct MockTransactionHistoryIterator : public TransactionHistoryIteratorBase { @@ -109,8 +115,11 @@ struct MockMongoInterface final : public StubMongoProcessInterface { }; MockMongoInterface(std::vector<FieldPath> fields, - std::vector<repl::OplogEntry> transactionEntries = {}) - : _fields(std::move(fields)), _transactionEntries(std::move(transactionEntries)) {} + std::vector<repl::OplogEntry> transactionEntries = {}, + std::vector<Document> documentsForLookup = {}) + : _fields(std::move(fields)), + _transactionEntries(std::move(transactionEntries)), + _documentsForLookup{std::move(documentsForLookup)} {} // For tests of transactions that involve multiple oplog entries. std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( @@ -130,6 +139,28 @@ struct MockMongoInterface final : public StubMongoProcessInterface { return iterator; } + // Called by DocumentSourceLookupPreImage to obtain the UUID of the oplog. Since that's the only + // piece of collection info we need for now, just return a BSONObj with the mock oplog UUID. + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) { + return BSON("uuid" << oplogUuid()); + } + + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern, + bool allowSpeculativeMajorityRead) final { + Matcher matcher(documentKey.toBson(), expCtx); + auto it = std::find_if(_documentsForLookup.begin(), + _documentsForLookup.end(), + [&](const Document& lookedUpDoc) { + return matcher.matches(lookedUpDoc.toBson(), nullptr); + }); + return (it != _documentsForLookup.end() ? *it : boost::optional<Document>{}); + } + // For "insert" tests. std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext*, const NamespaceString&, UUID) const final { @@ -148,6 +179,9 @@ struct MockMongoInterface final : public StubMongoProcessInterface { // These entries are stored in the order they would be returned by the // TransactionHistoryIterator, which is the _reverse_ of the order they appear in the oplog. std::vector<repl::OplogEntry> _transactionEntries; + + // These documents are used to feed the 'lookupSingleDocument' method. + std::vector<Document> _documentsForLookup; }; class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup { @@ -169,12 +203,13 @@ public: std::vector<FieldPath> docKeyFields = {}, const BSONObj& spec = kDefaultSpec, const boost::optional<Document> expectedInvalidate = {}, - const std::vector<repl::OplogEntry> transactionEntries = {}) { + const std::vector<repl::OplogEntry> transactionEntries = {}, + std::vector<Document> documentsForLookup = {}) { vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); auto closeCursor = stages.back(); - getExpCtx()->mongoProcessInterface = - std::make_unique<MockMongoInterface>(docKeyFields, transactionEntries); + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( + docKeyFields, transactionEntries, std::move(documentsForLookup)); auto next = closeCursor->getNext(); // Match stage should pass the doc down if expectedDoc is given. @@ -321,19 +356,20 @@ public: return lsid; } - /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ - static repl::OplogEntry makeOplogEntry(repl::OpTypeEnum opType, - NamespaceString nss, - BSONObj object, - boost::optional<UUID> uuid = testUuid(), - boost::optional<bool> fromMigrate = boost::none, - boost::optional<BSONObj> object2 = boost::none, - boost::optional<repl::OpTime> opTime = boost::none, - OperationSessionInfo sessionInfo = {}, - boost::optional<repl::OpTime> prevOpTime = {}) { + static repl::OplogEntry makeOplogEntry( + repl::OpTypeEnum opType, + NamespaceString nss, + BSONObj object, + boost::optional<UUID> uuid = testUuid(), + boost::optional<bool> fromMigrate = boost::none, + boost::optional<BSONObj> object2 = boost::none, + boost::optional<repl::OpTime> opTime = boost::none, + OperationSessionInfo sessionInfo = {}, + boost::optional<repl::OpTime> prevOpTime = {}, + boost::optional<repl::OpTime> preImageOpTime = boost::none) { long long hash = 1LL; return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime hash, // hash @@ -348,9 +384,9 @@ public: boost::none, // upsert Date_t(), // wall clock time boost::none, // statement id - prevOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none); // post-image optime + prevOpTime, // optime of previous write within same transaction + preImageOpTime, // pre-image optime + boost::none); // post-image optime } }; @@ -1966,6 +2002,282 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate); } +TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { + // Set the pre-image opTime to 1 second prior to the default event optime. + repl::OpTime preImageOpTime{Timestamp(kDefaultTs.getSecs() - 1, 1), 1}; + const auto preImageObj = BSON("_id" << 1 << "x" << 2); + + // The documentKey for the main change stream event. + const auto documentKey = BSON("_id" << 1); + + // The mock oplog UUID used by MockMongoInterface. + auto oplogUUID = MockMongoInterface::oplogUuid(); + + // Create an oplog entry for the pre-image no-op event. + auto preImageEntry = makeOplogEntry(OpTypeEnum::kNoop, + NamespaceString::kRsOplogNamespace, + preImageObj, // o + oplogUUID, // uuid + boost::none, // fromMigrate + boost::none, // o2 + preImageOpTime // opTime + ); + + // Create an oplog entry for the delete event that will look up the pre-image. + auto deleteEntry = makeOplogEntry(OpTypeEnum::kDelete, + nss, + documentKey, // o + testUuid(), // uuid + boost::none, // fromMigrate + boost::none, // o2 + kDefaultOpTime, // opTime + {}, // sessionInfo + {}, // prevOpTime + preImageOpTime // preImageOpTime + ); + + // Add the preImage oplog entry into a vector of documents that will be looked up. Add a dummy + // entry before it so that we know we are finding the pre-image based on the given timestamp. + repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm}; + std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()}, + Document{preImageEntry.toBSON()}}; + + // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available. + auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "off")); + Document expectedDeleteNoPreImage{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, documentKey}, + }; + checkTransformation( + deleteEntry, expectedDeleteNoPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "whenAvailable")); + Document expectedDeleteWithPreImage{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, documentKey}, + }; + checkTransformation( + deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + checkTransformation( + deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image, we see the event + // without the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "whenAvailable")); + checkTransformation(deleteEntry, expectedDeleteNoPreImage, {}, spec); + + // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the + // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, {}, spec), + AssertionException, + ErrorCodes::ChangeStreamHistoryLost); +} + +TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { + // Set the pre-image opTime to 1 second prior to the default event optime. + repl::OpTime preImageOpTime{Timestamp(kDefaultTs.getSecs() - 1, 1), 1}; + + // Define the pre-image object, the update operation spec, and the document key. + const auto updateSpec = BSON("$unset" << BSON("x" << 1)); + const auto preImageObj = BSON("_id" << 1 << "x" << 2); + const auto documentKey = BSON("_id" << 1); + + // The mock oplog UUID used by MockMongoInterface. + auto oplogUUID = MockMongoInterface::oplogUuid(); + + // Create an oplog entry for the pre-image no-op event. + auto preImageEntry = makeOplogEntry(OpTypeEnum::kNoop, + NamespaceString::kRsOplogNamespace, + preImageObj, // o + oplogUUID, // uuid + boost::none, // fromMigrate + boost::none, // o2 + preImageOpTime // opTime + ); + + // Create an oplog entry for the update event that will look up the pre-image. + auto updateEntry = makeOplogEntry(OpTypeEnum::kUpdate, + nss, + updateSpec, // o + testUuid(), // uuid + boost::none, // fromMigrate + documentKey, // o2 + kDefaultOpTime, // opTime + {}, // sessionInfo + {}, // prevOpTime + preImageOpTime // preImageOpTime + ); + + // Add the preImage oplog entry into a vector of documents that will be looked up. Add a dummy + // entry before it so that we know we are finding the pre-image based on the given timestamp. + repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm}; + std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()}, + Document{preImageEntry.toBSON()}}; + + // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available. + auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "off")); + Document expectedUpdateNoPreImage{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, documentKey}, + { + "updateDescription", + D{{"updatedFields", D{}}, {"removedFields", vector<V>{V("x"_sd)}}}, + }, + }; + checkTransformation( + updateEntry, expectedUpdateNoPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "whenAvailable")); + Document expectedUpdateWithPreImage{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, documentKey}, + { + "updateDescription", + D{{"updatedFields", D{}}, {"removedFields", vector<V>{V("x"_sd)}}}, + }, + }; + checkTransformation( + updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + checkTransformation( + updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image, we see the event + // without the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "whenAvailable")); + checkTransformation(updateEntry, expectedUpdateNoPreImage, {}, spec); + + // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the + // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, {}, spec), + AssertionException, + ErrorCodes::ChangeStreamHistoryLost); +} + +TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { + // Set the pre-image opTime to 1 second prior to the default event optime. + repl::OpTime preImageOpTime{Timestamp(kDefaultTs.getSecs() - 1, 1), 1}; + + // Define the pre-image object, the replacement document, and the document key. + const auto replacementDoc = BSON("_id" << 1 << "y" << 3); + const auto preImageObj = BSON("_id" << 1 << "x" << 2); + const auto documentKey = BSON("_id" << 1); + + // The mock oplog UUID used by MockMongoInterface. + auto oplogUUID = MockMongoInterface::oplogUuid(); + + // Create an oplog entry for the pre-image no-op event. + auto preImageEntry = makeOplogEntry(OpTypeEnum::kNoop, + NamespaceString::kRsOplogNamespace, + preImageObj, // o + oplogUUID, // uuid + boost::none, // fromMigrate + boost::none, // o2 + preImageOpTime // opTime + ); + + // Create an oplog entry for the replacement event that will look up the pre-image. + auto replaceEntry = makeOplogEntry(OpTypeEnum::kUpdate, + nss, + replacementDoc, // o + testUuid(), // uuid + boost::none, // fromMigrate + documentKey, // o2 + kDefaultOpTime, // opTime + {}, // sessionInfo + {}, // prevOpTime + preImageOpTime // preImageOpTime + ); + + // Add the preImage oplog entry into a vector of documents that will be looked up. Add a dummy + // entry before it so that we know we are finding the pre-image based on the given timestamp. + repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm}; + std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()}, + Document{preImageEntry.toBSON()}}; + + // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available. + auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "off")); + Document expectedReplaceNoPreImage{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kFullDocumentField, replacementDoc}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, documentKey}, + }; + checkTransformation( + replaceEntry, expectedReplaceNoPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "whenAvailable")); + Document expectedReplaceWithPreImage{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kFullDocumentField, replacementDoc}, + {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, documentKey}, + }; + checkTransformation( + replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + checkTransformation( + replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + + // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image, we see the event + // without the pre-image. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "whenAvailable")); + checkTransformation(replaceEntry, expectedReplaceNoPreImage, {}, spec); + + // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the + // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost. + spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, {}, spec), + AssertionException, + ErrorCodes::ChangeStreamHistoryLost); +} + TEST_F(ChangeStreamStageDBTest, MatchFiltersOperationsOnSystemCollections) { NamespaceString systemColl(nss.db() + ".system.users"); OplogEntry insert = makeOplogEntry(OpTypeEnum::kInsert, systemColl, BSON("_id" << 1)); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 20ceab62bd3..5c875c06741 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -92,6 +92,10 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), _changeStreamSpec); + // If the change stream spec requested a pre-image, make sure that we supply one. + _includePreImageOptime = + (spec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff); + // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. auto resumeAfter = spec.getResumeAfter(); @@ -188,6 +192,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document Value ns = input[repl::OplogEntry::kNssFieldName]; checkValueType(ns, repl::OplogEntry::kNssFieldName, BSONType::String); Value uuid = input[repl::OplogEntry::kUuidFieldName]; + Value preImageOpTime = input[repl::OplogEntry::kPreImageOpTimeFieldName]; std::vector<FieldPath> documentKeyFields; // Deal with CRUD operations and commands. @@ -338,7 +343,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document return doc.freeze(); } + // Add the post-image, pre-image, namespace, documentKey and other fields as appropriate. doc.addField(DocumentSourceChangeStream::kFullDocumentField, fullDocument); + if (_includePreImageOptime) { + // Set 'kFullDocumentBeforeChangeField' to the pre-image optime. The DSCSLookupPreImage + // stage will replace this optime with the actual pre-image taken from the oplog. + doc.addField(DocumentSourceChangeStream::kFullDocumentBeforeChangeField, preImageOpTime); + } doc.addField(DocumentSourceChangeStream::kNamespaceField, operationType == DocumentSourceChangeStream::kDropDatabaseOpType ? Value(Document{{"db", nss.db()}}) @@ -384,6 +395,9 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac deps->fields.insert(repl::OplogEntry::kSessionIdFieldName.toString()); deps->fields.insert(repl::OplogEntry::kTermFieldName.toString()); deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString()); + if (_includePreImageOptime) { + deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString()); + } return DepsTracker::State::EXHAUSTIVE_ALL; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index df8cecf49f0..693ddd6fa66 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -216,6 +216,9 @@ private: // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; + // Set to true if the pre-image optime should be included in output documents. + bool _includePreImageOptime = false; + // '_fcv' is used to determine which version of the resume token to generate for each change. // This is a snapshot of what the feature compatibility version was at the time the stream was // opened or resumed. diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp new file mode 100644 index 00000000000..af670df9961 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.cpp @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" + +#include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/db/repl/local_oplog_info.h" +#include "mongo/db/transaction_history_iterator.h" +#include "mongo/util/intrusive_counter.h" +#include "mongo/util/log.h" + +namespace mongo { + +constexpr StringData DocumentSourceLookupChangePreImage::kStageName; +constexpr StringData DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName; + +boost::intrusive_ptr<DocumentSourceLookupChangePreImage> DocumentSourceLookupChangePreImage::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) { + auto mode = spec.getFullDocumentBeforeChange(); + + return make_intrusive<DocumentSourceLookupChangePreImage>(expCtx, mode); +} + +DocumentSource::GetNextResult DocumentSourceLookupChangePreImage::doGetNext() { + auto input = pSource->getNext(); + if (!input.isAdvanced()) { + return input; + } + + // If this is not an update, replace or delete, then just pass along the result. + const auto kOpTypeField = DocumentSourceChangeStream::kOperationTypeField; + const auto opType = input.getDocument()[kOpTypeField]; + DocumentSourceChangeStream::checkValueType(opType, kOpTypeField, BSONType::String); + if (opType.getStringData() != DocumentSourceChangeStream::kUpdateOpType && + opType.getStringData() != DocumentSourceChangeStream::kReplaceOpType && + opType.getStringData() != DocumentSourceChangeStream::kDeleteOpType) { + return input; + } + + // If a pre-image is available, the transform stage will have populated it in the event's + // 'fullDocumentBeforeChange' field. If this field is missing and the pre-imaging mode is + // 'required', we throw an exception. Otherwise, we pass along the document unmodified. + auto preImageOpTimeVal = input.getDocument()[kFullDocumentBeforeChangeFieldName]; + if (preImageOpTimeVal.missing()) { + uassert(51770, + str::stream() + << "Change stream was configured to require a pre-image for all update, delete " + "and replace events, but no pre-image optime was recorded for event: " + << input.getDocument().toString(), + _fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kRequired); + return input; + } + + // Look up the pre-image using the optime. This may return boost::none if it was not found. + auto preImageOpTime = repl::OpTime::parse(preImageOpTimeVal.getDocument().toBson()); + auto preImageDoc = lookupPreImage(input.getDocument(), preImageOpTime); + + // Even if no pre-image was found, we have to replace the 'fullDocumentBeforeChange' field. + MutableDocument outputDoc(input.releaseDocument()); + outputDoc[kFullDocumentBeforeChangeFieldName] = (preImageDoc ? Value(*preImageDoc) : Value()); + + return outputDoc.freeze(); +} + +boost::optional<Document> DocumentSourceLookupChangePreImage::lookupPreImage( + const Document& inputDoc, const repl::OpTime& opTime) const { + // We need the oplog's UUID for lookup, so obtain the collection info via MongoProcessInterface. + auto localOplogInfo = pExpCtx->mongoProcessInterface->getCollectionOptions( + pExpCtx->opCtx, NamespaceString::kRsOplogNamespace); + + // Extract the UUID from the collection information. We should always have a valid uuid here. + auto oplogUUID = invariantStatusOK(UUID::parse(localOplogInfo["uuid"])); + + // Look up the pre-image oplog entry using the opTime as the query filter. + auto lookedUpDoc = + pExpCtx->mongoProcessInterface->lookupSingleDocument(pExpCtx, + NamespaceString::kRsOplogNamespace, + oplogUUID, + Document{opTime.asQuery()}, + boost::none); + + // Failing to find an oplog entry implies that the pre-image has rolled off the oplog. This is + // acceptable if the mode is "kWhenAvailable", but not if the mode is "kRequired". + if (!lookedUpDoc) { + uassert( + ErrorCodes::ChangeStreamHistoryLost, + str::stream() + << "Change stream was configured to require a pre-image for all update, delete and " + "replace events, but the pre-image was not found in the oplog for event: " + << inputDoc.toString(), + _fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kRequired); + + // Return boost::none to signify that we (legally) failed to find the pre-image. + return boost::none; + } + + // If we had an optime to look up, and we found an oplog entry with that timestamp, then we + // should always have a valid no-op entry containing a valid, non-empty pre-image document. + auto opLogEntry = invariantStatusOK(repl::OplogEntry::parse(lookedUpDoc->toBson())); + invariant(opLogEntry.getOpType() == repl::OpTypeEnum::kNoop); + invariant(!opLogEntry.getObject().isEmpty()); + + return Document{opLogEntry.getObject().getOwned()}; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h new file mode 100644 index 00000000000..eea42b816fb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_lookup_change_pre_image.h @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" + +namespace mongo { + +/** + * Part of the change stream API machinery used to look up the pre-image of a document. + * + * After a document that should have its pre-image included is transformed from the oplog, + * its "fullDocumentBeforeChange" field shall be the optime of the noop oplog entry containing the + * pre-image. This stage replaces that field with the actual pre-image document. + */ +class DocumentSourceLookupChangePreImage final : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalLookupChangePreImage"_sd; + static constexpr StringData kFullDocumentBeforeChangeFieldName = + DocumentSourceChangeStream::kFullDocumentBeforeChangeField; + + /** + * Creates a DocumentSourceLookupChangePostImage stage. + */ + static boost::intrusive_ptr<DocumentSourceLookupChangePreImage> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); + + DocumentSourceLookupChangePreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx, + FullDocumentBeforeChangeModeEnum mode) + : DocumentSource(kStageName, expCtx), _fullDocumentBeforeChangeMode(mode) { + // This stage should never be created with FullDocumentBeforeChangeMode::kOff. + invariant(_fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kOff); + } + + /** + * Only modifies a single path: "fullDocumentBeforeChange". + */ + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, + {kFullDocumentBeforeChangeFieldName.toString()}, + {}}; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + invariant(pipeState != Pipeline::SplitState::kSplitForShards); + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kChangeStreamStage); + constraints.canSwapWithMatch = true; + return constraints; + } + + // Conceptually, this stage must always run on the shards part of the pipeline. At + // present, however, pre-image retrieval is not supported in a sharded cluster, and + // so the distributed plan logic will not be used. + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return DistributedPlanLogic{this, nullptr, boost::none}; + } + + DepsTracker::State getDependencies(DepsTracker* deps) const { + deps->fields.insert(DocumentSourceChangeStream::kFullDocumentBeforeChangeField.toString()); + // This stage does not restrict the output fields to a finite set, and has no impact on + // whether metadata is available or needed. + return DepsTracker::State::SEE_NEXT; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + return (explain ? Value{Document{{kStageName, Document()}}} : Value()); + } + + const char* getSourceName() const final { + return kStageName.rawData(); + } + +private: + /** + * Performs the lookup to retrieve the full pre-image document for applicable operations. + */ + GetNextResult doGetNext() final; + + /** + * Looks up and returns a pre-image document at the specified opTime in the oplog. Returns + * boost::none if the mode is "kWhenAvailable" and no such oplog entry was found. Throws if the + * pre-image mode is "kRequired" and no entry was found. Invariants that if an oplog entry with + * the given opTime is found, it is a no-op entry with a valid non-empty pre-image document. + */ + boost::optional<Document> lookupPreImage(const Document& inputDoc, + const repl::OpTime& opTime) const; + + // Determines whether pre-images are strictly required or may be included only when available. + FullDocumentBeforeChangeModeEnum _fullDocumentBeforeChangeMode = + FullDocumentBeforeChangeModeEnum::kOff; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 5cc39e890bf..39276c064d4 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -45,6 +45,7 @@ #include "mongo/db/pipeline/document_source_internal_split_pipeline.h" #include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_source_out.h" @@ -1708,6 +1709,58 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline->getSources().back().get())); } +TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependentMatch) { + QueryTestServiceContext testServiceContext; + auto opCtx = testServiceContext.makeOperationContext(); + + intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); + expCtx->opCtx = opCtx.get(); + expCtx->uuid = UUID::gen(); + setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); + + auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); + ASSERT_EQ(stages.size(), 5UL); + // Make sure the pre-image lookup is at the end. + ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get())); + + auto matchPredicate = BSON("extra" + << "predicate"); + stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx)); + auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + pipeline->optimizePipeline(); + + // Make sure the $match stage has swapped before the change look up. + ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(pipeline->getSources().back().get())); +} + +TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPreImage) { + QueryTestServiceContext testServiceContext; + auto opCtx = testServiceContext.makeOperationContext(); + + intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); + expCtx->opCtx = opCtx.get(); + expCtx->uuid = UUID::gen(); + setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); + + auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" + << "required")); + auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx); + ASSERT_EQ(stages.size(), 5UL); + // Make sure the pre-image lookup is at the end. + ASSERT(dynamic_cast<DocumentSourceLookupChangePreImage*>(stages.back().get())); + + stages.push_back(DocumentSourceMatch::create( + BSON(DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName << BSONNULL), + expCtx)); + auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + pipeline->optimizePipeline(); + + // Make sure the $match stage stays at the end. + ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline->getSources().back().get())); +} + TEST(PipelineOptimizationTest, SortLimProjLimBecomesTopKSortProj) { std::string inputPipe = "[{$sort: {a: 1}}" diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index 481aff7c6ad..2503a989d74 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -633,6 +633,16 @@ Mongo.prototype._extractChangeStreamOptions = function(options) { delete options.startAtOperationTime; } + if (options.hasOwnProperty("fullDocumentBeforeChange")) { + changeStreamOptions.fullDocumentBeforeChange = options.fullDocumentBeforeChange; + delete options.fullDocumentBeforeChange; + } + + if (options.hasOwnProperty("allChangesForCluster")) { + changeStreamOptions.allChangesForCluster = options.allChangesForCluster; + delete options.allChangesForCluster; + } + return [{$changeStream: changeStreamOptions}, options]; }; |