summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/change_stream_event_transform.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-03-30 11:52:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 12:21:22 +0000
commitd6dbbeb062f3fd1f596790c249015bbccb1b6433 (patch)
treece43957ddb4cb61e3a63039d910dd220b1991aca /src/mongo/db/pipeline/change_stream_event_transform.cpp
parent622b08bf511c9a632494aafcd44c7290fbda7d04 (diff)
downloadmongo-d6dbbeb062f3fd1f596790c249015bbccb1b6433.tar.gz
SERVER-63860 Restore documentKey cache for 5.0 - 6.0 upgrade
Diffstat (limited to 'src/mongo/db/pipeline/change_stream_event_transform.cpp')
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp29
1 files changed, 8 insertions, 21 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 77e97f723a8..b8169f04e28 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
#include "mongo/db/pipeline/change_stream_filter_helpers.h"
-#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
@@ -111,14 +110,12 @@ ChangeStreamEventTransformation::ChangeStreamEventTransformation(
}
ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
: ChangeStreamEventTransformation(spec) {
- // Extract the resume token or high-water-mark from the spec.
+ // Extract the resume token and use it to construct the document key cache.
auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
-
- // If the change stream spec includes a resumeToken with a shard key, populate the document
- // key cache with the field paths.
- _documentKeyCache = change_stream_legacy::buildDocumentKeyCache(tokenData);
+ _documentKeyCache = std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, tokenData);
}
std::set<std::string> ChangeStreamDefaultEventTransformation::getFieldNameDependencies() const {
@@ -166,21 +163,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
operationType = DocumentSourceChangeStream::kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
documentKey = input[repl::OplogEntry::kObject2FieldName];
- // For oplog entries written on an older version of the server, the documentKey may be
- // missing.
if (documentKey.missing()) {
- // If we are resuming from an 'insert' oplog entry that does not have a documentKey,
- // it may have been read on an older version of the server that populated the
- // documentKey fields from the sharding catalog. We populate the fields we observed
- // in the resume token in order to retain consistent event ordering around the
- // resume point during upgrade. Otherwise, we default to _id as the only document
- // key field.
- if (_documentKeyCache && _documentKeyCache->first == uuid.getUuid()) {
- documentKey = Value(document_path_support::extractPathsFromDoc(
- fullDocument.getDocument(), _documentKeyCache->second));
- } else {
- documentKey = Value(Document{{"_id", id}});
- }
+ // If the o2 field is missing from the oplog entry, it must have been generated by a
+ // version of the server prior to 5.3. Extract the documentKey via the documentKey
+ // cache instead. This may require us to look up the shard key for this UUID.
+ documentKey = _documentKeyCache->getDocumentKeyForOplogInsert(input);
}
break;
}
@@ -545,7 +532,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation(
ChangeStreamEventTransformer::ChangeStreamEventTransformer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(spec);
+ _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec);
_viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec);
_isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) ==
DocumentSourceChangeStream::ChangeStreamType::kSingleCollection;