diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2022-03-30 11:52:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-30 12:21:22 +0000 |
commit | d6dbbeb062f3fd1f596790c249015bbccb1b6433 (patch) | |
tree | ce43957ddb4cb61e3a63039d910dd220b1991aca /src/mongo/db/pipeline/change_stream_event_transform.cpp | |
parent | 622b08bf511c9a632494aafcd44c7290fbda7d04 (diff) | |
download | mongo-d6dbbeb062f3fd1f596790c249015bbccb1b6433.tar.gz |
SERVER-63860 Restore documentKey cache for 5.0 - 6.0 upgrade
Diffstat (limited to 'src/mongo/db/pipeline/change_stream_event_transform.cpp')
-rw-r--r-- | src/mongo/db/pipeline/change_stream_event_transform.cpp | 29 |
1 files changed, 8 insertions, 21 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 77e97f723a8..b8169f04e28 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/change_stream_document_diff_parser.h" #include "mongo/db/pipeline/change_stream_filter_helpers.h" -#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" @@ -111,14 +110,12 @@ ChangeStreamEventTransformation::ChangeStreamEventTransformation( } ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) : ChangeStreamEventTransformation(spec) { - // Extract the resume token or high-water-mark from the spec. + // Extract the resume token and use it to construct the document key cache. auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); - - // If the change stream spec includes a resumeToken with a shard key, populate the document - // key cache with the field paths. - _documentKeyCache = change_stream_legacy::buildDocumentKeyCache(tokenData); + _documentKeyCache = std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, tokenData); } std::set<std::string> ChangeStreamDefaultEventTransformation::getFieldNameDependencies() const { @@ -166,21 +163,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum operationType = DocumentSourceChangeStream::kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; documentKey = input[repl::OplogEntry::kObject2FieldName]; - // For oplog entries written on an older version of the server, the documentKey may be - // missing. if (documentKey.missing()) { - // If we are resuming from an 'insert' oplog entry that does not have a documentKey, - // it may have been read on an older version of the server that populated the - // documentKey fields from the sharding catalog. We populate the fields we observed - // in the resume token in order to retain consistent event ordering around the - // resume point during upgrade. Otherwise, we default to _id as the only document - // key field. - if (_documentKeyCache && _documentKeyCache->first == uuid.getUuid()) { - documentKey = Value(document_path_support::extractPathsFromDoc( - fullDocument.getDocument(), _documentKeyCache->second)); - } else { - documentKey = Value(Document{{"_id", id}}); - } + // If the o2 field is missing from the oplog entry, it must have been generated by a + // version of the server prior to 5.3. Extract the documentKey via the documentKey + // cache instead. This may require us to look up the shard key for this UUID. + documentKey = _documentKeyCache->getDocumentKeyForOplogInsert(input); } break; } @@ -545,7 +532,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation( ChangeStreamEventTransformer::ChangeStreamEventTransformer( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { - _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(spec); + _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec); _viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec); _isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) == DocumentSourceChangeStream::ChangeStreamType::kSingleCollection; |