diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2022-03-30 11:52:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-30 12:21:22 +0000 |
commit | d6dbbeb062f3fd1f596790c249015bbccb1b6433 (patch) | |
tree | ce43957ddb4cb61e3a63039d910dd220b1991aca /src/mongo | |
parent | 622b08bf511c9a632494aafcd44c7290fbda7d04 (diff) | |
download | mongo-d6dbbeb062f3fd1f596790c249015bbccb1b6433.tar.gz |
SERVER-63860 Restore documentKey cache for 5.0 - 6.0 upgrade
Diffstat (limited to 'src/mongo')
4 files changed, 113 insertions, 33 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 77e97f723a8..b8169f04e28 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/change_stream_document_diff_parser.h" #include "mongo/db/pipeline/change_stream_filter_helpers.h" -#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" @@ -111,14 +110,12 @@ ChangeStreamEventTransformation::ChangeStreamEventTransformation( } ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) : ChangeStreamEventTransformation(spec) { - // Extract the resume token or high-water-mark from the spec. + // Extract the resume token and use it to construct the document key cache. auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); - - // If the change stream spec includes a resumeToken with a shard key, populate the document - // key cache with the field paths. - _documentKeyCache = change_stream_legacy::buildDocumentKeyCache(tokenData); + _documentKeyCache = std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, tokenData); } std::set<std::string> ChangeStreamDefaultEventTransformation::getFieldNameDependencies() const { @@ -166,21 +163,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum operationType = DocumentSourceChangeStream::kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; documentKey = input[repl::OplogEntry::kObject2FieldName]; - // For oplog entries written on an older version of the server, the documentKey may be - // missing. if (documentKey.missing()) { - // If we are resuming from an 'insert' oplog entry that does not have a documentKey, - // it may have been read on an older version of the server that populated the - // documentKey fields from the sharding catalog. We populate the fields we observed - // in the resume token in order to retain consistent event ordering around the - // resume point during upgrade. Otherwise, we default to _id as the only document - // key field. - if (_documentKeyCache && _documentKeyCache->first == uuid.getUuid()) { - documentKey = Value(document_path_support::extractPathsFromDoc( - fullDocument.getDocument(), _documentKeyCache->second)); - } else { - documentKey = Value(Document{{"_id", id}}); - } + // If the o2 field is missing from the oplog entry, it must have been generated by a + // version of the server prior to 5.3. Extract the documentKey via the documentKey + // cache instead. This may require us to look up the shard key for this UUID. + documentKey = _documentKeyCache->getDocumentKeyForOplogInsert(input); } break; } @@ -545,7 +532,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation( ChangeStreamEventTransformer::ChangeStreamEventTransformer( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { - _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(spec); + _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec); _viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec); _isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) == DocumentSourceChangeStream::ChangeStreamType::kSingleCollection; diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h index b56bbe793b1..0e3ca27ddc4 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.h +++ b/src/mongo/db/pipeline/change_stream_event_transform.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/exec/document_value/document.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h" #include "mongo/db/pipeline/expression_context.h" @@ -68,14 +69,15 @@ protected: */ class ChangeStreamDefaultEventTransformation final : public ChangeStreamEventTransformation { public: - ChangeStreamDefaultEventTransformation(const DocumentSourceChangeStreamSpec& spec); + ChangeStreamDefaultEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); Document applyTransformation(const Document& fromDoc) const override; std::set<std::string> getFieldNameDependencies() const override; private: // Records the documentKey fields from the client's resume token, if present. - boost::optional<std::pair<UUID, std::vector<FieldPath>>> _documentKeyCache; + std::unique_ptr<change_stream_legacy::DocumentKeyCache> _documentKeyCache; }; /** diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index d207ef22714..07678a0235a 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/document_source_change_stream_transform.h" #include "mongo/db/pipeline/document_source_change_stream_unwind_transaction.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/s/grid.h" namespace mongo { namespace change_stream_legacy { @@ -129,17 +130,19 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo return Document{opLogEntry.getObject().getOwned()}; } -boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( - const ResumeTokenData& tokenData) { +DocumentKeyCache::DocumentKeyCache(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ResumeTokenData& tokenData) + : _expCtx(expCtx) { if (!tokenData.eventIdentifier.missing() && tokenData.uuid) { auto docKey = tokenData.eventIdentifier.getDocument(); // Newly added events store their operationType and operationDescription as the // eventIdentifier, not a documentKey. if (docKey["_id"].missing()) { - return {}; + return; } + // Extract the list of documentKey fields from the resume token. std::vector<FieldPath> docKeyFields; auto iter = docKey.fieldIterator(); while (iter.more()) { @@ -147,9 +150,77 @@ boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( docKeyFields.push_back(fieldPair.first); } - return std::make_pair(tokenData.uuid.get(), docKeyFields); + // If the document key from the resume token has more than one field, that means it + // includes the shard key and thus should never change. + const bool isFinal = docKeyFields.size() > 1; + + _cache[tokenData.uuid.get()] = std::make_pair(docKeyFields, isFinal); + } +} + +Value DocumentKeyCache::getDocumentKeyForOplogInsert(Document oplogInsert) { + tassert(63860, + "Expected 'insert' oplog entry", + oplogInsert["op"].getType() == BSONType::String && + oplogInsert["op"].getStringData() == "i"_sd); + + auto nss = NamespaceString(oplogInsert["ns"].getStringData()); + auto uuid = oplogInsert["ui"].getUuid(); + + // Extract the documentKey fields from the cache, or add them if not already present. + auto it = _cache.find(uuid); + if (it == _cache.end() || !it->second.second) { + auto docKeyFields = _collectDocumentKeyFieldsForHostedCollection(nss, uuid); + if (it == _cache.end() || docKeyFields.second) { + _cache[uuid] = docKeyFields; + } + } + + // Extract the documentKey values from the inserted document. + return Value(document_path_support::extractPathsFromDoc(oplogInsert["o"].getDocument(), + _cache[uuid].first)); +} + +DocumentKeyCache::DocumentKeyCacheEntry +DocumentKeyCache::_collectDocumentKeyFieldsForHostedCollection(const NamespaceString& nss, + const UUID& uuid) const { + // If this is a replica set, nothing is sharded and never will be. + if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { + return {{"_id"}, true}; + } + + auto* const catalogCache = Grid::get(_expCtx->opCtx)->catalogCache(); + auto swCM = catalogCache->getCollectionRoutingInfo(_expCtx->opCtx, nss); + if (swCM.isOK()) { + const auto& cm = swCM.getValue(); + if (cm.isSharded() && cm.uuidMatches(uuid)) { + // Unpack the shard key. Collection is sharded so mark as final. + const auto& shardKeyPattern = cm.getShardKeyPattern().getKeyPatternFields(); + return {_shardKeyToDocumentKeyFields(shardKeyPattern), true}; + } + } else if (swCM != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(std::move(swCM)); + } + + // An unsharded collection can still become sharded so is not final. If the uuid doesn't match + // the one stored in the ScopedCollectionDescription, this implies that the collection has been + // dropped and recreated as sharded. We don't know what the old document key fields might have + // been in this case so we return just _id. + return {{"_id"}, false}; +} + +std::vector<FieldPath> DocumentKeyCache::_shardKeyToDocumentKeyFields( + const std::vector<std::unique_ptr<FieldRef>>& keyPatternFields) const { + std::vector<FieldPath> result; + bool gotId = false; + for (auto& field : keyPatternFields) { + result.emplace_back(field->dottedField()); + gotId |= (result.back().fullPath() == "_id"); + } + if (!gotId) { // If not part of the shard key, "_id" comes last. + result.emplace_back("_id"); } - return {}; + return result; } } // namespace change_stream_legacy diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h index 2c30cef0a91..40df4e139bf 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h @@ -27,6 +27,8 @@ * it in the license file. */ +#pragma once + #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_change_stream_gen.h" @@ -48,12 +50,30 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo const Document& preImageId); /** - * Builds document key cache from the resume token. The cache will be used when the insert oplog - * entry does not contain the documentKey. This can happen when reading an oplog entry written by an - * older version of the server. + * Maintains a document key cache. The cache will be used when the insert oplog entry does not + * contain the documentKey. This can happen when reading an oplog entry written by an older version + * of the server. Extracts the documentKey from the resume token on construction, and consults the + * sharding catalog as needed when getDocumentKeyForOplogInsert is called. + * TODO SERVER-64992: remove documentKey caching after branching for 6.0. */ -boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( - const ResumeTokenData& data); +class DocumentKeyCache { +public: + using DocumentKeyCacheEntry = std::pair<std::vector<FieldPath>, bool>; + + DocumentKeyCache(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const ResumeTokenData& token); + + Value getDocumentKeyForOplogInsert(Document oplogInsert); + +private: + DocumentKeyCacheEntry _collectDocumentKeyFieldsForHostedCollection(const NamespaceString& nss, + const UUID& uuid) const; + std::vector<FieldPath> _shardKeyToDocumentKeyFields( + const std::vector<std::unique_ptr<FieldRef>>& keyPatternFields) const; + + boost::intrusive_ptr<ExpressionContext> _expCtx; + std::map<UUID, DocumentKeyCacheEntry> _cache; +}; /** * Represents the change stream operation types that are NOT guarded behind the 'showExpandedEvents' |