summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDrew Paroski <drew.paroski@mongodb.com>2022-01-31 19:56:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-22 18:32:58 +0000
commitcd6b3e618c5d2988d0131ce81baec018eee79767 (patch)
tree93584fed9b411a7472262f85a97696943ed7658a
parent16acd82faa5c42a8447619b1cb18e5da341185c8 (diff)
downloadmongo-cd6b3e618c5d2988d0131ce81baec018eee79767.tar.gz
SERVER-61894 Support the 'showRawUpdateDescription' option in change stream specs
(cherry picked from commit 8ff9eb25cd17af72f7d9d3a15418c069eee1ae9d) (cherry picked from commit b156a5b704741985498482166e90c18ca9b03b95) (cherry picked from commit dea572279c27e7096a319568fb192940f8de9b59)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml2
-rw-r--r--jstests/change_streams/show_raw_update_description_v1_oplog.js155
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp46
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h4
-rw-r--r--src/mongo/shell/mongo.js5
8 files changed, 213 insertions, 17 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 003df21a7f1..b37d69ed182 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -138,6 +138,8 @@ all:
test_file: jstests/aggregation/lookup_let_optimization.js
- ticket: SERVER-62272
test_file: jstests/sharding/chunk_migration_with_schema_validation.js
+ - ticket: SERVER-61894
+ test_file: jstests/change_streams/show_raw_update_description_v1_oplog.js
# Tests that should only be excluded from particular suites should be listed under that suite.
diff --git a/jstests/change_streams/show_raw_update_description_v1_oplog.js b/jstests/change_streams/show_raw_update_description_v1_oplog.js
new file mode 100644
index 00000000000..cc4595883fd
--- /dev/null
+++ b/jstests/change_streams/show_raw_update_description_v1_oplog.js
@@ -0,0 +1,155 @@
+/**
+ * Tests that change streams with the 'showRawUpdateDescription' option enabled will return update
+ * events with the 'rawUpdateDescription' field instead of the 'updateDescription' field, and tests
+ * that the 'showRawUpdateDescription' option has no effect on replacements or other types of
+ * events.
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+
+// Drop and recreate the collections to be used in this set of tests.
+assertDropAndRecreateCollection(db, "t1");
+
+assert.commandWorked(db.t1.insert([
+ {_id: 3, a: 5, b: 1},
+ {_id: 4, a: 0, b: 1},
+ {_id: 5, a: 0, b: 1},
+ {_id: 6, a: 1, b: 1},
+ {_id: 7, a: 1, b: 1},
+ {_id: 8, a: 2, b: {c: 1}}
+]));
+
+const cst = new ChangeStreamTest(db);
+let cursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {showRawUpdateDescription: true}}], collection: db.t1});
+
+//
+// Test insert, replace, and delete operations and verify the corresponding change stream events
+// are unaffected by the 'showRawUpdateDescription' option.
+//
+jsTestLog("Testing insert");
+assert.commandWorked(db.t1.insert({_id: 1, a: 1}));
+let expected = {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 1},
+ ns: {db: "test", coll: "t1"},
+ operationType: "insert",
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+jsTestLog("Testing upsert");
+assert.commandWorked(db.t1.update({_id: 2}, {_id: 2, a: 4}, {upsert: true}));
+expected = {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, a: 4},
+ ns: {db: "test", coll: "t1"},
+ operationType: "insert",
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+jsTestLog("Testing replacement");
+assert.commandWorked(db.t1.update({_id: 1}, {_id: 1, a: 3}));
+expected = {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 3},
+ ns: {db: "test", coll: "t1"},
+ operationType: "replace",
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+jsTestLog("Testing another replacement");
+assert.commandWorked(db.t1.update({_id: 1}, {_id: 1, b: 3}));
+expected = {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, b: 3},
+ ns: {db: "test", coll: "t1"},
+ operationType: "replace",
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+jsTestLog("Testing delete");
+assert.commandWorked(db.t1.remove({_id: 1}));
+expected = {
+ documentKey: {_id: 1},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+jsTestLog("Testing delete with justOne:false");
+assert.commandWorked(db.t1.remove({a: 1}, {justOne: false}));
+expected = [
+ {
+ documentKey: {_id: 6},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+ },
+ {
+ documentKey: {_id: 7},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+ }
+];
+cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected});
+
+//
+// The remainder of the test-cases below exercise various update scenarios that produce
+// 'rawUpdateDescription'.
+//
+
+//
+// Test op-style updates.
+//
+jsTestLog("Testing op-style update with $inc");
+assert.commandWorked(db.t1.update({_id: 3}, {$inc: {b: 2}}));
+expected = {
+ documentKey: {_id: 3},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ rawUpdateDescription: {"$v": 1, "$set": {b: 3}}
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+jsTestLog("Testing op-style update with $set and multi:true");
+assert.commandWorked(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true}));
+expected = [
+ {
+ documentKey: {_id: 4},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ rawUpdateDescription: {"$v": 1, "$set": {b: 2}}
+ },
+ {
+ documentKey: {_id: 5},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ rawUpdateDescription: {"$v": 1, "$set": {b: 2}}
+ }
+];
+cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected});
+
+jsTestLog("Testing op-style update with $unset");
+assert.commandWorked(db.t1.update({_id: 3}, {$unset: {b: ""}}));
+expected = {
+ documentKey: {_id: 3},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ rawUpdateDescription: {"$v": 1, "$unset": {b: true}}
+};
+cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected});
+
+jsTestLog("Testing op-style update with $set on nested field");
+assert.commandWorked(db.t1.update({_id: 8}, {$set: {"b.d": 2}}));
+expected = {
+ documentKey: {_id: 8},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ rawUpdateDescription: {"$v": 1, "$set": {"b.d": 2}}
+};
+cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+cst.cleanUp();
+}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 82a1b177e92..b0d27a5eda4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -77,6 +77,7 @@ constexpr StringData DocumentSourceChangeStream::kIdField;
constexpr StringData DocumentSourceChangeStream::kNamespaceField;
constexpr StringData DocumentSourceChangeStream::kUuidField;
constexpr StringData DocumentSourceChangeStream::kUpdateDescriptionField;
+constexpr StringData DocumentSourceChangeStream::kRawUpdateDescriptionField;
constexpr StringData DocumentSourceChangeStream::kOperationTypeField;
constexpr StringData DocumentSourceChangeStream::kStageName;
constexpr StringData DocumentSourceChangeStream::kClusterTimeField;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 690cd4512e2..1baa2ac7a75 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -112,9 +112,15 @@ public:
static constexpr StringData kNamespaceField = "ns"_sd;
// Name of the field which stores information about updates. Only applies when OperationType
- // is "update".
+ // is "update". Note that this field will be omitted if the 'showRawUpdateDescription' option
+ // is enabled in the change stream spec.
static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd;
+ // Name of the field which stores the raw update description from the oplog about updates.
+ // Only applies when OperationType is "update". Note that this field is only present when
+ // the 'showRawUpdateDescription' option is enabled in the change stream spec.
+ static constexpr StringData kRawUpdateDescriptionField = "rawUpdateDescription"_sd;
+
// The name of the subfield of '_id' where the UUID of the namespace will be located after the
// transformation.
static constexpr StringData kUuidField = "uuid"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index 93a92c25173..85136eec7d1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -109,3 +109,12 @@ structs:
deletes may appear that do not reflect actual deletions or insertions
of data. Instead they reflect this data moving from one shard to
another.
+
+ showRawUpdateDescription:
+ cpp_name: showRawUpdateDescription
+ type: optionalBool
+ description: An internal flag indicating whether each update event in a change
+ stream should contain a "rawUpdateDescription" field that exposes the
+ raw update description from the oplog entry for the corresponding
+ update command. When this flag is set to true, the standard
+ "updateDescription" field in update events will be omitted.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index e055104b91a..6a88ccbcae3 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -92,6 +92,9 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
_changeStreamSpec);
+ // If the change stream spec requested raw update descriptions, make sure we honor that request.
+ _showRawUpdateDescription = spec.getShowRawUpdateDescription();
+
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
auto resumeAfter = spec.getResumeAfter();
@@ -246,21 +249,28 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
checkValueType(input[repl::OplogEntry::kObjectFieldName],
repl::OplogEntry::kObjectFieldName,
BSONType::Object);
- Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
- Value updatedFields = opObject["$set"];
- Value removedFields = opObject["$unset"];
-
- // Extract the field names of $unset document.
- vector<Value> removedFieldsVector;
- if (removedFields.getType() == BSONType::Object) {
- auto iter = removedFields.getDocument().fieldIterator();
- while (iter.more()) {
- removedFieldsVector.push_back(Value(iter.next().first));
+
+ if (_showRawUpdateDescription) {
+ updateDescription = input[repl::OplogEntry::kObjectFieldName];
+ } else {
+ Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
+ Value updatedFields = opObject["$set"];
+ Value removedFields = opObject["$unset"];
+
+ // Extract the field names of $unset document.
+ vector<Value> removedFieldsVector;
+ if (removedFields.getType() == BSONType::Object) {
+ auto iter = removedFields.getDocument().fieldIterator();
+ while (iter.more()) {
+ removedFieldsVector.push_back(Value(iter.next().first));
+ }
}
+
+ updateDescription = Value(
+ Document{{"updatedFields",
+ updatedFields.missing() ? Value(Document()) : updatedFields},
+ {"removedFields", removedFieldsVector}});
}
- updateDescription = Value(Document{
- {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields},
- {"removedFields", removedFieldsVector}});
} else {
operationType = DocumentSourceChangeStream::kReplaceOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
@@ -359,9 +369,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
: Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}));
doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey);
- // Note that 'updateDescription' might be the 'missing' value, in which case it will not be
- // serialized.
- doc.addField("updateDescription", updateDescription);
+ // Note that the update description field might be the 'missing' value, in which case it will
+ // not be serialized.
+ auto updateDescriptionFieldName = _showRawUpdateDescription
+ ? DocumentSourceChangeStream::kRawUpdateDescriptionField
+ : DocumentSourceChangeStream::kUpdateDescriptionField;
+ doc.addField(updateDescriptionFieldName, updateDescription);
+
return doc.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index c20f5864e67..15c2200358d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -213,6 +213,10 @@ private:
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
+ // Set to true if update events should contain the "rawUpdateDescription" field. When this flag
+ // is set to true, the standard "updateDescription" field in update events will be omitted.
+ bool _showRawUpdateDescription = false;
+
// '_fcv' is used to determine which version of the resume token to generate for each change.
// This is a snapshot of what the feature compatibility version was at the time the stream was
// opened or resumed.
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index f7ac30239ea..e01bd4ca084 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -645,6 +645,11 @@ Mongo.prototype._extractChangeStreamOptions = function(options) {
delete options.startAtOperationTime;
}
+ if (options.hasOwnProperty("showRawUpdateDescription")) {
+ changeStreamOptions.showRawUpdateDescription = options.showRawUpdateDescription;
+ delete options.showRawUpdateDescription;
+ }
+
return [{$changeStream: changeStreamOptions}, options];
};