diff options
Diffstat (limited to 'src/mongo/db')
5 files changed, 51 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 82a1b177e92..b0d27a5eda4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -77,6 +77,7 @@ constexpr StringData DocumentSourceChangeStream::kIdField; constexpr StringData DocumentSourceChangeStream::kNamespaceField; constexpr StringData DocumentSourceChangeStream::kUuidField; constexpr StringData DocumentSourceChangeStream::kUpdateDescriptionField; +constexpr StringData DocumentSourceChangeStream::kRawUpdateDescriptionField; constexpr StringData DocumentSourceChangeStream::kOperationTypeField; constexpr StringData DocumentSourceChangeStream::kStageName; constexpr StringData DocumentSourceChangeStream::kClusterTimeField; diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 690cd4512e2..1baa2ac7a75 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -112,9 +112,15 @@ public: static constexpr StringData kNamespaceField = "ns"_sd; // Name of the field which stores information about updates. Only applies when OperationType - // is "update". + // is "update". Note that this field will be omitted if the 'showRawUpdateDescription' option + // is enabled in the change stream spec. static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd; + // Name of the field which stores the raw update description from the oplog about updates. + // Only applies when OperationType is "update". Note that this field is only present when + // the 'showRawUpdateDescription' option is enabled in the change stream spec. + static constexpr StringData kRawUpdateDescriptionField = "rawUpdateDescription"_sd; + // The name of the subfield of '_id' where the UUID of the namespace will be located after the // transformation. static constexpr StringData kUuidField = "uuid"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl index 93a92c25173..85136eec7d1 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.idl +++ b/src/mongo/db/pipeline/document_source_change_stream.idl @@ -109,3 +109,12 @@ structs: deletes may appear that do not reflect actual deletions or insertions of data. Instead they reflect this data moving from one shard to another. + + showRawUpdateDescription: + cpp_name: showRawUpdateDescription + type: optionalBool + description: An internal flag indicating whether each update event in a change + stream should contain a "rawUpdateDescription" field that exposes the + raw update description from the oplog entry for the corresponding + update command. When this flag is set to true, the standard + "updateDescription" field in update events will be omitted. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index e055104b91a..6a88ccbcae3 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -92,6 +92,9 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), _changeStreamSpec); + // If the change stream spec requested raw update descriptions, make sure we honor that request. + _showRawUpdateDescription = spec.getShowRawUpdateDescription(); + // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. auto resumeAfter = spec.getResumeAfter(); @@ -246,21 +249,28 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document checkValueType(input[repl::OplogEntry::kObjectFieldName], repl::OplogEntry::kObjectFieldName, BSONType::Object); - Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); - Value updatedFields = opObject["$set"]; - Value removedFields = opObject["$unset"]; - - // Extract the field names of $unset document. - vector<Value> removedFieldsVector; - if (removedFields.getType() == BSONType::Object) { - auto iter = removedFields.getDocument().fieldIterator(); - while (iter.more()) { - removedFieldsVector.push_back(Value(iter.next().first)); + + if (_showRawUpdateDescription) { + updateDescription = input[repl::OplogEntry::kObjectFieldName]; + } else { + Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); + Value updatedFields = opObject["$set"]; + Value removedFields = opObject["$unset"]; + + // Extract the field names of $unset document. + vector<Value> removedFieldsVector; + if (removedFields.getType() == BSONType::Object) { + auto iter = removedFields.getDocument().fieldIterator(); + while (iter.more()) { + removedFieldsVector.push_back(Value(iter.next().first)); + } } + + updateDescription = Value( + Document{{"updatedFields", + updatedFields.missing() ? Value(Document()) : updatedFields}, + {"removedFields", removedFieldsVector}}); } - updateDescription = Value(Document{ - {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, - {"removedFields", removedFieldsVector}}); } else { operationType = DocumentSourceChangeStream::kReplaceOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; @@ -359,9 +369,13 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document : Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); doc.addField(DocumentSourceChangeStream::kDocumentKeyField, documentKey); - // Note that 'updateDescription' might be the 'missing' value, in which case it will not be - // serialized. - doc.addField("updateDescription", updateDescription); + // Note that the update description field might be the 'missing' value, in which case it will + // not be serialized. + auto updateDescriptionFieldName = _showRawUpdateDescription + ? DocumentSourceChangeStream::kRawUpdateDescriptionField + : DocumentSourceChangeStream::kUpdateDescriptionField; + doc.addField(updateDescriptionFieldName, updateDescription); + return doc.freeze(); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index c20f5864e67..15c2200358d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -213,6 +213,10 @@ private: // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; + // Set to true if update events should contain the "rawUpdateDescription" field. When this flag + // is set to true, the standard "updateDescription" field in update events will be omitted. + bool _showRawUpdateDescription = false; + // '_fcv' is used to determine which version of the resume token to generate for each change. // This is a snapshot of what the feature compatibility version was at the time the stream was // opened or resumed. |