summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp46
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h4
5 files changed, 51 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 82a1b177e92..b0d27a5eda4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -77,6 +77,7 @@ constexpr StringData DocumentSourceChangeStream::kIdField;
constexpr StringData DocumentSourceChangeStream::kNamespaceField;
constexpr StringData DocumentSourceChangeStream::kUuidField;
constexpr StringData DocumentSourceChangeStream::kUpdateDescriptionField;
+constexpr StringData DocumentSourceChangeStream::kRawUpdateDescriptionField;
constexpr StringData DocumentSourceChangeStream::kOperationTypeField;
constexpr StringData DocumentSourceChangeStream::kStageName;
constexpr StringData DocumentSourceChangeStream::kClusterTimeField;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 690cd4512e2..1baa2ac7a75 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -112,9 +112,15 @@ public:
static constexpr StringData kNamespaceField = "ns"_sd;
// Name of the field which stores information about updates. Only applies when OperationType
- // is "update".
+ // is "update". Note that this field will be omitted if the 'showRawUpdateDescription' option
+ // is enabled in the change stream spec.
static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd;
+ // Name of the field which stores the raw update description from the oplog about updates.
+ // Only applies when OperationType is "update". Note that this field is only present when
+ // the 'showRawUpdateDescription' option is enabled in the change stream spec.
+ static constexpr StringData kRawUpdateDescriptionField = "rawUpdateDescription"_sd;
+
// The name of the subfield of '_id' where the UUID of the namespace will be located after the
// transformation.
static constexpr StringData kUuidField = "uuid"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index 93a92c25173..85136eec7d1 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -109,3 +109,12 @@ structs:
deletes may appear that do not reflect actual deletions or insertions
of data. Instead they reflect this data moving from one shard to
another.
+
+ showRawUpdateDescription:
+ cpp_name: showRawUpdateDescription
+ type: optionalBool
+ description: An internal flag indicating whether each update event in a change
+ stream should contain a "rawUpdateDescription" field that exposes the
+ raw update description from the oplog entry for the corresponding
+ update command. When this flag is set to true, the standard
+ "updateDescription" field in update events will be omitted.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index e055104b91a..6a88ccbcae3 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -92,6 +92,9 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
_changeStreamSpec);
+ // If the change stream spec requested raw update descriptions, make sure we honor that request.
+ _showRawUpdateDescription = spec.getShowRawUpdateDescription();
+
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
auto resumeAfter = spec.getResumeAfter();
@@ -246,21 +249,28 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
checkValueType(input[repl::OplogEntry::kObjectFieldName],
repl::OplogEntry::kObjectFieldName,
BSONType::Object);
- Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
- Value updatedFields = opObject["$set"];
- Value removedFields = opObject["$unset"];
-
- // Extract the field names of $unset document.
- vector<Value> removedFieldsVector;
- if (removedFields.getType() == BSONType::Object) {
- auto iter = removedFields.getDocument().fieldIterator();
- while (iter.more()) {
- removedFieldsVector.push_back(Value(iter.next().first));
+
+ if (_showRawUpdateDescription) {
+ updateDescription = input[repl::OplogEntry::kObjectFieldName];
+ } else {
+ Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
+ Value updatedFields = opObject["$set"];
+ Value removedFields = opObject["$unset"];
+
+ // Extract the field names of $unset document.
+ vector<Value> removedFieldsVector;
+ if (removedFields.getType() == BSONType::Object) {
+ auto iter = removedFields.getDocument().fieldIterator();
+ while (iter.more()) {
+ removedFieldsVector.push_back(Value(iter.next().first));
+ }
}
+
+ updateDescription = Value(
+ Document{{"updatedFields",
+ updatedFields.missing() ? Value(Document()) : updatedFields},
+ {"removedFields", removedFieldsVector}});
}
- updateDescription = Value(Document{
- {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields},
- {"removedFields", removedFieldsVector}});
} else {
operationType = DocumentSourceChangeStream::kReplaceOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
@@ -359,9 +369,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
: Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}));
doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey);
- // Note that 'updateDescription' might be the 'missing' value, in which case it will not be
- // serialized.
- doc.addField("updateDescription", updateDescription);
+ // Note that the update description field might be the 'missing' value, in which case it will
+ // not be serialized.
+ auto updateDescriptionFieldName = _showRawUpdateDescription
+ ? DocumentSourceChangeStream::kRawUpdateDescriptionField
+ : DocumentSourceChangeStream::kUpdateDescriptionField;
+ doc.addField(updateDescriptionFieldName, updateDescription);
+
return doc.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index c20f5864e67..15c2200358d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -213,6 +213,10 @@ private:
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
+ // Set to true if update events should contain the "rawUpdateDescription" field. When this flag
+ // is set to true, the standard "updateDescription" field in update events will be omitted.
+ bool _showRawUpdateDescription = false;
+
// '_fcv' is used to determine which version of the resume token to generate for each change.
// This is a snapshot of what the feature compatibility version was at the time the stream was
// opened or resumed.