summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-03-30 11:52:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 12:21:22 +0000
commitd6dbbeb062f3fd1f596790c249015bbccb1b6433 (patch)
treece43957ddb4cb61e3a63039d910dd220b1991aca /src
parent622b08bf511c9a632494aafcd44c7290fbda7d04 (diff)
downloadmongo-d6dbbeb062f3fd1f596790c249015bbccb1b6433.tar.gz
SERVER-63860 Restore documentKey cache for 5.0 - 6.0 upgrade
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp29
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.h6
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp81
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.h30
4 files changed, 113 insertions, 33 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 77e97f723a8..b8169f04e28 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
#include "mongo/db/pipeline/change_stream_filter_helpers.h"
-#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
@@ -111,14 +110,12 @@ ChangeStreamEventTransformation::ChangeStreamEventTransformation(
}
ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
: ChangeStreamEventTransformation(spec) {
- // Extract the resume token or high-water-mark from the spec.
+ // Extract the resume token and use it to construct the document key cache.
auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
-
- // If the change stream spec includes a resumeToken with a shard key, populate the document
- // key cache with the field paths.
- _documentKeyCache = change_stream_legacy::buildDocumentKeyCache(tokenData);
+ _documentKeyCache = std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, tokenData);
}
std::set<std::string> ChangeStreamDefaultEventTransformation::getFieldNameDependencies() const {
@@ -166,21 +163,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
operationType = DocumentSourceChangeStream::kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
documentKey = input[repl::OplogEntry::kObject2FieldName];
- // For oplog entries written on an older version of the server, the documentKey may be
- // missing.
if (documentKey.missing()) {
- // If we are resuming from an 'insert' oplog entry that does not have a documentKey,
- // it may have been read on an older version of the server that populated the
- // documentKey fields from the sharding catalog. We populate the fields we observed
- // in the resume token in order to retain consistent event ordering around the
- // resume point during upgrade. Otherwise, we default to _id as the only document
- // key field.
- if (_documentKeyCache && _documentKeyCache->first == uuid.getUuid()) {
- documentKey = Value(document_path_support::extractPathsFromDoc(
- fullDocument.getDocument(), _documentKeyCache->second));
- } else {
- documentKey = Value(Document{{"_id", id}});
- }
+ // If the o2 field is missing from the oplog entry, it must have been generated by a
+ // version of the server prior to 5.3. Extract the documentKey via the documentKey
+ // cache instead. This may require us to look up the shard key for this UUID.
+ documentKey = _documentKeyCache->getDocumentKeyForOplogInsert(input);
}
break;
}
@@ -545,7 +532,7 @@ Document ChangeStreamViewDefinitionEventTransformation::applyTransformation(
ChangeStreamEventTransformer::ChangeStreamEventTransformer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(spec);
+ _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec);
_viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec);
_isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) ==
DocumentSourceChangeStream::ChangeStreamType::kSingleCollection;
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h
index b56bbe793b1..0e3ca27ddc4 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.h
+++ b/src/mongo/db/pipeline/change_stream_event_transform.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -68,14 +69,15 @@ protected:
*/
class ChangeStreamDefaultEventTransformation final : public ChangeStreamEventTransformation {
public:
- ChangeStreamDefaultEventTransformation(const DocumentSourceChangeStreamSpec& spec);
+ ChangeStreamDefaultEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
Document applyTransformation(const Document& fromDoc) const override;
std::set<std::string> getFieldNameDependencies() const override;
private:
// Records the documentKey fields from the client's resume token, if present.
- boost::optional<std::pair<UUID, std::vector<FieldPath>>> _documentKeyCache;
+ std::unique_ptr<change_stream_legacy::DocumentKeyCache> _documentKeyCache;
};
/**
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index d207ef22714..07678a0235a 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
#include "mongo/db/pipeline/document_source_change_stream_unwind_transaction.h"
#include "mongo/db/pipeline/expression.h"
+#include "mongo/s/grid.h"
namespace mongo {
namespace change_stream_legacy {
@@ -129,17 +130,19 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
return Document{opLogEntry.getObject().getOwned()};
}
-boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
- const ResumeTokenData& tokenData) {
+DocumentKeyCache::DocumentKeyCache(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ResumeTokenData& tokenData)
+ : _expCtx(expCtx) {
if (!tokenData.eventIdentifier.missing() && tokenData.uuid) {
auto docKey = tokenData.eventIdentifier.getDocument();
// Newly added events store their operationType and operationDescription as the
// eventIdentifier, not a documentKey.
if (docKey["_id"].missing()) {
- return {};
+ return;
}
+ // Extract the list of documentKey fields from the resume token.
std::vector<FieldPath> docKeyFields;
auto iter = docKey.fieldIterator();
while (iter.more()) {
@@ -147,9 +150,77 @@ boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
docKeyFields.push_back(fieldPair.first);
}
- return std::make_pair(tokenData.uuid.get(), docKeyFields);
+ // If the document key from the resume token has more than one field, that means it
+ // includes the shard key and thus should never change.
+ const bool isFinal = docKeyFields.size() > 1;
+
+ _cache[tokenData.uuid.get()] = std::make_pair(docKeyFields, isFinal);
+ }
+}
+
+Value DocumentKeyCache::getDocumentKeyForOplogInsert(Document oplogInsert) {
+ tassert(63860,
+ "Expected 'insert' oplog entry",
+ oplogInsert["op"].getType() == BSONType::String &&
+ oplogInsert["op"].getStringData() == "i"_sd);
+
+ auto nss = NamespaceString(oplogInsert["ns"].getStringData());
+ auto uuid = oplogInsert["ui"].getUuid();
+
+ // Extract the documentKey fields from the cache, or add them if not already present.
+ auto it = _cache.find(uuid);
+ if (it == _cache.end() || !it->second.second) {
+ auto docKeyFields = _collectDocumentKeyFieldsForHostedCollection(nss, uuid);
+ if (it == _cache.end() || docKeyFields.second) {
+ _cache[uuid] = docKeyFields;
+ }
+ }
+
+ // Extract the documentKey values from the inserted document.
+ return Value(document_path_support::extractPathsFromDoc(oplogInsert["o"].getDocument(),
+ _cache[uuid].first));
+}
+
+DocumentKeyCache::DocumentKeyCacheEntry
+DocumentKeyCache::_collectDocumentKeyFieldsForHostedCollection(const NamespaceString& nss,
+ const UUID& uuid) const {
+ // If this is a replica set, nothing is sharded and never will be.
+ if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
+ return {{"_id"}, true};
+ }
+
+ auto* const catalogCache = Grid::get(_expCtx->opCtx)->catalogCache();
+ auto swCM = catalogCache->getCollectionRoutingInfo(_expCtx->opCtx, nss);
+ if (swCM.isOK()) {
+ const auto& cm = swCM.getValue();
+ if (cm.isSharded() && cm.uuidMatches(uuid)) {
+ // Unpack the shard key. Collection is sharded so mark as final.
+ const auto& shardKeyPattern = cm.getShardKeyPattern().getKeyPatternFields();
+ return {_shardKeyToDocumentKeyFields(shardKeyPattern), true};
+ }
+ } else if (swCM != ErrorCodes::NamespaceNotFound) {
+ uassertStatusOK(std::move(swCM));
+ }
+
+ // An unsharded collection can still become sharded so is not final. If the uuid doesn't match
+ // the one stored in the ScopedCollectionDescription, this implies that the collection has been
+ // dropped and recreated as sharded. We don't know what the old document key fields might have
+ // been in this case so we return just _id.
+ return {{"_id"}, false};
+}
+
+std::vector<FieldPath> DocumentKeyCache::_shardKeyToDocumentKeyFields(
+ const std::vector<std::unique_ptr<FieldRef>>& keyPatternFields) const {
+ std::vector<FieldPath> result;
+ bool gotId = false;
+ for (auto& field : keyPatternFields) {
+ result.emplace_back(field->dottedField());
+ gotId |= (result.back().fullPath() == "_id");
+ }
+ if (!gotId) { // If not part of the shard key, "_id" comes last.
+ result.emplace_back("_id");
}
- return {};
+ return result;
}
} // namespace change_stream_legacy
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
index 2c30cef0a91..40df4e139bf 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#pragma once
+
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
@@ -48,12 +50,30 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
const Document& preImageId);
/**
- * Builds document key cache from the resume token. The cache will be used when the insert oplog
- * entry does not contain the documentKey. This can happen when reading an oplog entry written by an
- * older version of the server.
+ * Maintains a document key cache. The cache will be used when the insert oplog entry does not
+ * contain the documentKey. This can happen when reading an oplog entry written by an older version
+ * of the server. Extracts the documentKey from the resume token on construction, and consults the
+ * sharding catalog as needed when getDocumentKeyForOplogInsert is called.
+ * TODO SERVER-64992: remove documentKey caching after branching for 6.0.
*/
-boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
- const ResumeTokenData& data);
+class DocumentKeyCache {
+public:
+ using DocumentKeyCacheEntry = std::pair<std::vector<FieldPath>, bool>;
+
+ DocumentKeyCache(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ResumeTokenData& token);
+
+ Value getDocumentKeyForOplogInsert(Document oplogInsert);
+
+private:
+ DocumentKeyCacheEntry _collectDocumentKeyFieldsForHostedCollection(const NamespaceString& nss,
+ const UUID& uuid) const;
+ std::vector<FieldPath> _shardKeyToDocumentKeyFields(
+ const std::vector<std::unique_ptr<FieldRef>>& keyPatternFields) const;
+
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
+ std::map<UUID, DocumentKeyCacheEntry> _cache;
+};
/**
* Represents the change stream operation types that are NOT guarded behind the 'showExpandedEvents'