diff options
author | Drew Paroski <drew.paroski@mongodb.com> | 2022-01-31 19:56:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-22 18:32:58 +0000 |
commit | cd6b3e618c5d2988d0131ce81baec018eee79767 (patch) | |
tree | 93584fed9b411a7472262f85a97696943ed7658a | |
parent | 16acd82faa5c42a8447619b1cb18e5da341185c8 (diff) | |
download | mongo-cd6b3e618c5d2988d0131ce81baec018eee79767.tar.gz |
SERVER-61894 Support the 'showRawUpdateDescription' option in change stream specs
(cherry picked from commit 8ff9eb25cd17af72f7d9d3a15418c069eee1ae9d)
(cherry picked from commit b156a5b704741985498482166e90c18ca9b03b95)
(cherry picked from commit dea572279c27e7096a319568fb192940f8de9b59)
8 files changed, 213 insertions, 17 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 003df21a7f1..b37d69ed182 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -138,6 +138,8 @@ all: test_file: jstests/aggregation/lookup_let_optimization.js - ticket: SERVER-62272 test_file: jstests/sharding/chunk_migration_with_schema_validation.js + - ticket: SERVER-61894 + test_file: jstests/change_streams/show_raw_update_description_v1_oplog.js # Tests that should only be excluded from particular suites should be listed under that suite. diff --git a/jstests/change_streams/show_raw_update_description_v1_oplog.js b/jstests/change_streams/show_raw_update_description_v1_oplog.js new file mode 100644 index 00000000000..cc4595883fd --- /dev/null +++ b/jstests/change_streams/show_raw_update_description_v1_oplog.js @@ -0,0 +1,155 @@ +/** + * Tests that change streams with the 'showRawUpdateDescription' option enabled will return update + * events with the 'rawUpdateDescription' field instead of the 'updateDescription' field, and tests + * that the 'showRawUpdateDescription' option has no effect on replacements or other types of + * events. + */ +(function() { +"use strict"; + +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. + +// Drop and recreate the collections to be used in this set of tests. +assertDropAndRecreateCollection(db, "t1"); + +assert.commandWorked(db.t1.insert([ + {_id: 3, a: 5, b: 1}, + {_id: 4, a: 0, b: 1}, + {_id: 5, a: 0, b: 1}, + {_id: 6, a: 1, b: 1}, + {_id: 7, a: 1, b: 1}, + {_id: 8, a: 2, b: {c: 1}} +])); + +const cst = new ChangeStreamTest(db); +let cursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: {showRawUpdateDescription: true}}], collection: db.t1}); + +// +// Test insert, replace, and delete operations and verify the corresponding change stream events +// are unaffected by the 'showRawUpdateDescription' option. +// +jsTestLog("Testing insert"); +assert.commandWorked(db.t1.insert({_id: 1, a: 1})); +let expected = { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 1}, + ns: {db: "test", coll: "t1"}, + operationType: "insert", +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +jsTestLog("Testing upsert"); +assert.commandWorked(db.t1.update({_id: 2}, {_id: 2, a: 4}, {upsert: true})); +expected = { + documentKey: {_id: 2}, + fullDocument: {_id: 2, a: 4}, + ns: {db: "test", coll: "t1"}, + operationType: "insert", +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +jsTestLog("Testing replacement"); +assert.commandWorked(db.t1.update({_id: 1}, {_id: 1, a: 3})); +expected = { + documentKey: {_id: 1}, + fullDocument: {_id: 1, a: 3}, + ns: {db: "test", coll: "t1"}, + operationType: "replace", +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +jsTestLog("Testing another replacement"); +assert.commandWorked(db.t1.update({_id: 1}, {_id: 1, b: 3})); +expected = { + documentKey: {_id: 1}, + fullDocument: {_id: 1, b: 3}, + ns: {db: "test", coll: "t1"}, + operationType: "replace", +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +jsTestLog("Testing delete"); +assert.commandWorked(db.t1.remove({_id: 1})); +expected = { + documentKey: {_id: 1}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +jsTestLog("Testing delete with justOne:false"); +assert.commandWorked(db.t1.remove({a: 1}, {justOne: false})); +expected = [ + { + documentKey: {_id: 6}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", + }, + { + documentKey: {_id: 7}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", + } +]; +cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}); + +// +// The remainder of the test-cases below exercise various update scenarios that produce +// 'rawUpdateDescription'. +// + +// +// Test op-style updates. +// +jsTestLog("Testing op-style update with $inc"); +assert.commandWorked(db.t1.update({_id: 3}, {$inc: {b: 2}})); +expected = { + documentKey: {_id: 3}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + rawUpdateDescription: {"$v": 1, "$set": {b: 3}} +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +jsTestLog("Testing op-style update with $set and multi:true"); +assert.commandWorked(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true})); +expected = [ + { + documentKey: {_id: 4}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + rawUpdateDescription: {"$v": 1, "$set": {b: 2}} + }, + { + documentKey: {_id: 5}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + rawUpdateDescription: {"$v": 1, "$set": {b: 2}} + } +]; +cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}); + +jsTestLog("Testing op-style update with $unset"); +assert.commandWorked(db.t1.update({_id: 3}, {$unset: {b: ""}})); +expected = { + documentKey: {_id: 3}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + rawUpdateDescription: {"$v": 1, "$unset": {b: true}} +}; +cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}); + +jsTestLog("Testing op-style update with $set on nested field"); +assert.commandWorked(db.t1.update({_id: 8}, {$set: {"b.d": 2}})); +expected = { + documentKey: {_id: 8}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + rawUpdateDescription: {"$v": 1, "$set": {"b.d": 2}} +}; +cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + +cst.cleanUp(); +}()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 82a1b177e92..b0d27a5eda4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -77,6 +77,7 @@ constexpr StringData DocumentSourceChangeStream::kIdField; constexpr StringData DocumentSourceChangeStream::kNamespaceField; constexpr StringData DocumentSourceChangeStream::kUuidField; constexpr StringData DocumentSourceChangeStream::kUpdateDescriptionField; +constexpr StringData DocumentSourceChangeStream::kRawUpdateDescriptionField; constexpr StringData DocumentSourceChangeStream::kOperationTypeField; constexpr StringData DocumentSourceChangeStream::kStageName; constexpr StringData DocumentSourceChangeStream::kClusterTimeField; diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 690cd4512e2..1baa2ac7a75 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -112,9 +112,15 @@ public: static constexpr StringData kNamespaceField = "ns"_sd; // Name of the field which stores information about updates. Only applies when OperationType - // is "update". + // is "update". Note that this field will be omitted if the 'showRawUpdateDescription' option + // is enabled in the change stream spec. static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd; + // Name of the field which stores the raw update description from the oplog about updates. + // Only applies when OperationType is "update". Note that this field is only present when + // the 'showRawUpdateDescription' option is enabled in the change stream spec. + static constexpr StringData kRawUpdateDescriptionField = "rawUpdateDescription"_sd; + // The name of the subfield of '_id' where the UUID of the namespace will be located after the // transformation. static constexpr StringData kUuidField = "uuid"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index 93a92c25173..85136eec7d1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -109,3 +109,12 @@ structs: deletes may appear that do not reflect actual deletions or insertions of data. Instead they reflect this data moving from one shard to another. + + showRawUpdateDescription: + cpp_name: showRawUpdateDescription + type: optionalBool + description: An internal flag indicating whether each update event in a change + stream should contain a "rawUpdateDescription" field that exposes the + raw update description from the oplog entry for the corresponding + update command. When this flag is set to true, the standard + "updateDescription" field in update events will be omitted. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index e055104b91a..6a88ccbcae3 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -92,6 +92,9 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), _changeStreamSpec); + // If the change stream spec requested raw update descriptions, make sure we honor that request. + _showRawUpdateDescription = spec.getShowRawUpdateDescription(); + // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. auto resumeAfter = spec.getResumeAfter(); @@ -246,21 +249,28 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document checkValueType(input[repl::OplogEntry::kObjectFieldName], repl::OplogEntry::kObjectFieldName, BSONType::Object); - Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); - Value updatedFields = opObject["$set"]; - Value removedFields = opObject["$unset"]; - - // Extract the field names of $unset document. - vector<Value> removedFieldsVector; - if (removedFields.getType() == BSONType::Object) { - auto iter = removedFields.getDocument().fieldIterator(); - while (iter.more()) { - removedFieldsVector.push_back(Value(iter.next().first)); + + if (_showRawUpdateDescription) { + updateDescription = input[repl::OplogEntry::kObjectFieldName]; + } else { + Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); + Value updatedFields = opObject["$set"]; + Value removedFields = opObject["$unset"]; + + // Extract the field names of $unset document. + vector<Value> removedFieldsVector; + if (removedFields.getType() == BSONType::Object) { + auto iter = removedFields.getDocument().fieldIterator(); + while (iter.more()) { + removedFieldsVector.push_back(Value(iter.next().first)); + } } + + updateDescription = Value( + Document{{"updatedFields", + updatedFields.missing() ? Value(Document()) : updatedFields}, + {"removedFields", removedFieldsVector}}); } - updateDescription = Value(Document{ - {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, - {"removedFields", removedFieldsVector}}); } else { operationType = DocumentSourceChangeStream::kReplaceOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; @@ -359,9 +369,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document : Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey); - // Note that 'updateDescription' might be the 'missing' value, in which case it will not be - // serialized. - doc.addField("updateDescription", updateDescription); + // Note that the update description field might be the 'missing' value, in which case it will + // not be serialized. + auto updateDescriptionFieldName = _showRawUpdateDescription + ? DocumentSourceChangeStream::kRawUpdateDescriptionField + : DocumentSourceChangeStream::kUpdateDescriptionField; + doc.addField(updateDescriptionFieldName, updateDescription); + return doc.freeze(); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index c20f5864e67..15c2200358d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -213,6 +213,10 @@ private: // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; + // Set to true if update events should contain the "rawUpdateDescription" field. When this flag + // is set to true, the standard "updateDescription" field in update events will be omitted. + bool _showRawUpdateDescription = false; + // '_fcv' is used to determine which version of the resume token to generate for each change. // This is a snapshot of what the feature compatibility version was at the time the stream was // opened or resumed. diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index f7ac30239ea..e01bd4ca084 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -645,6 +645,11 @@ Mongo.prototype._extractChangeStreamOptions = function(options) { delete options.startAtOperationTime; } + if (options.hasOwnProperty("showRawUpdateDescription")) { + changeStreamOptions.showRawUpdateDescription = options.showRawUpdateDescription; + delete options.showRawUpdateDescription; + } + return [{$changeStream: changeStreamOptions}, options]; }; |