From 572a0c3987787a66f0192009b30651b8bae822ff Mon Sep 17 00:00:00 2001 From: Davis Haupt Date: Mon, 15 May 2023 13:22:59 +0000 Subject: SERVER-75512 key telemetry store on hash rather than BSONObj --- .../telemetry/telemetry_collect_on_mongos.js | 12 ++-- src/mongo/db/clientcursor.cpp | 5 +- src/mongo/db/clientcursor.h | 3 +- src/mongo/db/commands/find_cmd.cpp | 2 +- src/mongo/db/commands/run_aggregate.cpp | 2 +- src/mongo/db/curop.h | 2 + src/mongo/db/cursor_manager.cpp | 2 + .../db/pipeline/document_source_telemetry.cpp | 9 +-- src/mongo/db/query/find_request_shapifier.cpp | 11 ++- src/mongo/db/query/request_shapifier.h | 8 +++ src/mongo/db/query/serialization_options.h | 2 + src/mongo/db/query/telemetry.cpp | 65 ++++++++++-------- src/mongo/db/query/telemetry.h | 43 ++++++------ src/mongo/db/query/telemetry_store_test.cpp | 79 +++++++++++++--------- src/mongo/s/query/cluster_client_cursor_impl.cpp | 4 +- src/mongo/s/query/cluster_client_cursor_impl.h | 3 + src/mongo/s/query/cluster_cursor_manager.cpp | 1 + 17 files changed, 147 insertions(+), 106 deletions(-) diff --git a/jstests/noPassthrough/telemetry/telemetry_collect_on_mongos.js b/jstests/noPassthrough/telemetry/telemetry_collect_on_mongos.js index 97e4f9df3cf..ff9fadc85c7 100644 --- a/jstests/noPassthrough/telemetry/telemetry_collect_on_mongos.js +++ b/jstests/noPassthrough/telemetry/telemetry_collect_on_mongos.js @@ -86,7 +86,7 @@ const assertExpectedResults = (results, // Since the cursor hasn't been exhausted yet, ensure no telemetry results have been written // yet. let telemetry = getTelemetry(db); - assert.eq(0, telemetry.length); + assert.eq(0, telemetry.length, telemetry); // Run a getMore to exhaust the cursor, then ensure telemetry results have been written // accurately. batchSize must be 2 so the cursor recognizes exhaustion. @@ -97,7 +97,7 @@ const assertExpectedResults = (results, })); // returns 1 doc, exhausts the cursor // The $telemetry query for the previous `getTelemetry` is included in this call to $telemetry. telemetry = getTelemetry(db); - assert.eq(2, telemetry.length); + assert.eq(2, telemetry.length, telemetry); assertExpectedResults(telemetry[0], telemetryKey, /* expectedExecCount */ 1, @@ -112,7 +112,7 @@ const assertExpectedResults = (results, coll.find({v: {$gt: 0, $lt: 1}}).batchSize(10).toArray(); // returns 0 docs coll.find({v: {$gt: 0, $lt: 2}}).batchSize(10).toArray(); // return 1 doc telemetry = getTelemetry(db); - assert.eq(2, telemetry.length); + assert.eq(2, telemetry.length, telemetry); assertExpectedResults(telemetry[0], telemetryKey, /* expectedExecCount */ 4, @@ -150,7 +150,7 @@ const assertExpectedResults = (results, // Since the cursor hasn't been exhausted yet, ensure no telemetry results have been written // yet. let telemetry = getTelemetry(db); - assert.eq(0, telemetry.length); + assert.eq(0, telemetry.length, telemetry); // Run a getMore to exhaust the cursor, then ensure telemetry results have been written // accurately. batchSize must be 2 so the cursor recognizes exhaustion. @@ -161,7 +161,7 @@ const assertExpectedResults = (results, })); // returns 1 doc, exhausts the cursor // The $telemetry query for the previous `getTelemetry` is included in this call to $telemetry. telemetry = getTelemetry(db); - assert.eq(2, telemetry.length); + assert.eq(2, telemetry.length, telemetry); assertExpectedResults(telemetry[0], telemetryKey, /* expectedExecCount */ 1, @@ -185,7 +185,7 @@ const assertExpectedResults = (results, {$project: {hello: "$galaxy"}}, ]); // returns 1 doc telemetry = getTelemetry(db); - assert.eq(2, telemetry.length); + assert.eq(2, telemetry.length, telemetry); assertExpectedResults(telemetry[0], telemetryKey, /* expectedExecCount */ 4, diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index a18b2fec45b..55e116e5893 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -124,6 +124,7 @@ ClientCursor::ClientCursor(ClientCursorParams params, _planSummary(_exec->getPlanExplainer().getPlanSummary()), _planCacheKey(CurOp::get(operationUsingCursor)->debug().planCacheKey), _queryHash(CurOp::get(operationUsingCursor)->debug().queryHash), + _telemetryStoreKeyHash(CurOp::get(operationUsingCursor)->debug().telemetryStoreKeyHash), _telemetryStoreKey(CurOp::get(operationUsingCursor)->debug().telemetryStoreKey), _telemetryRequestShapifier( std::move(CurOp::get(operationUsingCursor)->debug().telemetryRequestShapifier)), @@ -160,8 +161,9 @@ void ClientCursor::dispose(OperationContext* opCtx, boost::optional now) return; } - if (_telemetryStoreKey && opCtx) { + if (_telemetryStoreKeyHash && opCtx) { telemetry::writeTelemetry(opCtx, + _telemetryStoreKeyHash, _telemetryStoreKey, std::move(_telemetryRequestShapifier), _metrics.executionTime.value_or(Microseconds{0}).count(), @@ -406,6 +408,7 @@ void collectTelemetryMongod(OperationContext* opCtx, auto& opDebug = CurOp::get(opCtx)->debug(); telemetry::writeTelemetry( opCtx, + opDebug.telemetryStoreKeyHash, opDebug.telemetryStoreKey, std::move(requestShapifier), opDebug.additiveMetrics.executionTime.value_or(Microseconds{0}).count(), diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 68c0a48e260..9e7d35ade9a 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -447,8 +447,9 @@ private: boost::optional _planCacheKey; boost::optional _queryHash; - // The shape of the original query serialized with readConcern, application name, and namespace. // If boost::none, telemetry should not be collected for this cursor. + boost::optional _telemetryStoreKeyHash; + // TODO: SERVER-73152 remove telemetryStoreKey when RequestShapifier is used for agg. boost::optional _telemetryStoreKey; // Metrics that are accumulated over the lifetime of the cursor, incremented with each getMore. // Useful for diagnostics like telemetry. diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 2c261b751b2..90e6fa15ca1 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -782,7 +782,7 @@ public: } // Set the telemetryStoreKey to none so telemetry isn't collected when we've done a // FLE rewrite. - CurOp::get(opCtx)->debug().telemetryStoreKey = boost::none; + CurOp::get(opCtx)->debug().telemetryStoreKeyHash = boost::none; CurOp::get(opCtx)->debug().shouldOmitDiagnosticInformation = true; } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 8a2d675982d..4a95b56eb13 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -1054,7 +1054,7 @@ Status runAggregate(OperationContext* opCtx, // Set the telemetryStoreKey to none so telemetry isn't collected when we've done a FLE // rewrite. - CurOp::get(opCtx)->debug().telemetryStoreKey = boost::none; + CurOp::get(opCtx)->debug().telemetryStoreKeyHash = boost::none; } pipeline->optimizePipeline(); diff --git a/src/mongo/db/curop.h b/src/mongo/db/curop.h index 2358cd17b30..352b10961b0 100644 --- a/src/mongo/db/curop.h +++ b/src/mongo/db/curop.h @@ -294,6 +294,8 @@ public: boost::optional queryHash; // The shape of the original query serialized with readConcern, application name, and namespace. // If boost::none, telemetry should not be collected for this operation. + boost::optional telemetryStoreKeyHash; + // TODO: SERVER-73152 remove telemetryStoreKey when RequestShapifier is used for agg. boost::optional telemetryStoreKey; // The RequestShapifier used by telemetry to shapify the request payload into the telemetry // store key. diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index 0a499fe586c..ac9c41accfd 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -215,6 +215,8 @@ StatusWith CursorManager::pinCursor( CurOp::get(opCtx)->debug().planCacheKey = cursor->_planCacheKey; // Pass along telemetry context so it is retrievable after query execution for storing metrics. + CurOp::get(opCtx)->debug().telemetryStoreKeyHash = cursor->_telemetryStoreKeyHash; + // TODO: SERVER-73152 remove telemetryStoreKey when RequestShapifier is used for agg. CurOp::get(opCtx)->debug().telemetryStoreKey = cursor->_telemetryStoreKey; cursor->_operationUsingCursor = opCtx; diff --git a/src/mongo/db/pipeline/document_source_telemetry.cpp b/src/mongo/db/pipeline/document_source_telemetry.cpp index ad1ce6cea2a..b037515796f 100644 --- a/src/mongo/db/pipeline/document_source_telemetry.cpp +++ b/src/mongo/db/pipeline/document_source_telemetry.cpp @@ -190,9 +190,9 @@ DocumentSource::GetNextResult DocumentSourceTelemetry::doGetNext() { Timestamp{Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0)}; for (auto&& [key, metrics] : *partition) { try { - auto hmacKey = metrics->makeTelemetryKey( - key, _applyHmacToIdentifiers, _hmacKey, pExpCtx->opCtx); - _materializedPartition.push_back({{"key", std::move(hmacKey)}, + auto telemetryKey = + metrics->computeTelemetryKey(pExpCtx->opCtx, _applyHmacToIdentifiers, _hmacKey); + _materializedPartition.push_back({{"key", std::move(telemetryKey)}, {"metrics", metrics->toBSON()}, {"asOf", partitionReadTime}}); } catch (const DBException& ex) { @@ -201,7 +201,8 @@ DocumentSource::GetNextResult DocumentSourceTelemetry::doGetNext() { 3, "Error encountered when applying hmac to query shape, will not publish " "telemetry for this entry.", - "status"_attr = ex.toStatus()); + "status"_attr = ex.toStatus(), + "hash"_attr = key); if (kDebugBuild) { tasserted(7349401, "Was not able to re-parse telemetry key when reading telemetry."); diff --git a/src/mongo/db/query/find_request_shapifier.cpp b/src/mongo/db/query/find_request_shapifier.cpp index fdb96a6ad19..8002a152a13 100644 --- a/src/mongo/db/query/find_request_shapifier.cpp +++ b/src/mongo/db/query/find_request_shapifier.cpp @@ -40,11 +40,6 @@ void addNonShapeObjCmdLiterals(BSONObjBuilder* bob, const FindCommandRequest& findCommand, const SerializationOptions& opts, const boost::intrusive_ptr& expCtx) { - - if (const auto& comment = expCtx->opCtx->getComment()) { - opts.appendLiteral(bob, "comment", *comment); - } - if (auto noCursorTimeout = findCommand.getNoCursorTimeout()) { // Capture whether noCursorTimeout was specified in the query, do not distinguish between // true or false. @@ -95,9 +90,11 @@ BSONObj FindRequestShapifier::makeTelemetryKey( _request.getAllowPartialResults().value_or(false)); } - // Fields for literal redaction. Adds comment, batchSize, maxTimeMS, and noCursorTimeOut. + // Fields for literal redaction. Adds batchSize, maxTimeMS, and noCursorTimeOut. addNonShapeObjCmdLiterals(&bob, _request, opts, expCtx); - + if (_comment) { + opts.appendLiteral(&bob, "comment", *_comment); + } if (_applicationName.has_value()) { bob.append("applicationName", _applicationName.value()); } diff --git a/src/mongo/db/query/request_shapifier.h b/src/mongo/db/query/request_shapifier.h index 6f3d79f61be..1bae8f913f9 100644 --- a/src/mongo/db/query/request_shapifier.h +++ b/src/mongo/db/query/request_shapifier.h @@ -67,8 +67,16 @@ protected: _applicationName = metadata->getApplicationName().toString(); } } + if (const auto& comment = opCtx->getComment()) { + BSONObjBuilder commentBuilder; + commentBuilder.append(*comment); + _commentObj = commentBuilder.obj(); + _comment = _commentObj.firstElement(); + } } boost::optional _applicationName; + BSONObj _commentObj; + boost::optional _comment = boost::none; }; } // namespace mongo::telemetry diff --git a/src/mongo/db/query/serialization_options.h b/src/mongo/db/query/serialization_options.h index 3b2159585c4..22e376ee4e2 100644 --- a/src/mongo/db/query/serialization_options.h +++ b/src/mongo/db/query/serialization_options.h @@ -95,6 +95,8 @@ struct SerializationOptions { applyHmacToIdentifiers(fieldNamesHmacPolicy_), identifierHmacPolicy(fieldNamesHmacPolicy_) {} + SerializationOptions(LiteralSerializationPolicy policy) : literalPolicy(policy) {} + // Helper function for removing identifiable information (like collection/db names). // Note: serializeFieldPath/serializeFieldPathFromString should be used for field // names. diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index 10da833147e..af17da7af02 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -46,6 +46,7 @@ #include "mongo/db/query/query_planner_params.h" #include "mongo/db/query/query_request_helper.h" #include "mongo/db/query/rate_limiting.h" +#include "mongo/db/query/serialization_options.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/query/telemetry_util.h" #include "mongo/logv2/log.h" @@ -320,13 +321,13 @@ std::string fleSafeFieldNameRedactor(const BSONElement& e) { * Append the element to the builder and apply hmac to any literals within the element. The element * may be of any type. */ -void appendWithHmacAppliedLiterals(BSONObjBuilder& builder, const BSONElement& el) { +void appendWithAbstractedLiterals(BSONObjBuilder& builder, const BSONElement& el) { if (el.type() == Object) { builder.append(el.fieldNameStringData(), el.Obj().redact(false, fleSafeFieldNameRedactor)); } else if (el.type() == Array) { BSONObjBuilder arrayBuilder = builder.subarrayStart(fleSafeFieldNameRedactor(el)); for (auto&& arrayElem : el.Obj()) { - appendWithHmacAppliedLiterals(arrayBuilder, arrayElem); + appendWithAbstractedLiterals(arrayBuilder, arrayElem); } arrayBuilder.done(); } else { @@ -337,21 +338,18 @@ void appendWithHmacAppliedLiterals(BSONObjBuilder& builder, const BSONElement& e static const StringData replacementForLiteralArgs = "?"_sd; -} // namespace +std::size_t hash(const BSONObj& obj) { + return absl::hash_internal::CityHash64(obj.objdata(), obj.objsize()); +} -BSONObj TelemetryEntry::makeTelemetryKey(const BSONObj& key, - bool applyHmacToIdentifiers, - std::string hmacKey, - OperationContext* opCtx) const { - if (!applyHmacToIdentifiers) { - return key; - } +} // namespace - if (_hmacAppliedKey) { - return *_hmacAppliedKey; - } +BSONObj TelemetryEntry::computeTelemetryKey(OperationContext* opCtx, + bool applyHmacToIdentifiers, + std::string hmacKey) const { // The telemetry key for find queries is generated by serializing all the command fields - // and applied hmac if SerializationOptions indicate to do so. The resulting key is of the form: + // and applying hmac if SerializationOptions indicate to do so. The resulting key is of the + // form: // { // queryShape: { // cmdNs: {db: "...", coll: "..."}, @@ -370,7 +368,7 @@ BSONObj TelemetryEntry::makeTelemetryKey(const BSONObj& key, ? SerializationOptions( [&](StringData sd) { return sha256HmacStringDataHasher(hmacKey, sd); }, LiteralSerializationPolicy::kToDebugTypeString) - : SerializationOptions(false); + : SerializationOptions(LiteralSerializationPolicy::kToDebugTypeString); return requestShapifier->makeTelemetryKey(serializationOpts, opCtx); } @@ -384,8 +382,13 @@ BSONObj TelemetryEntry::makeTelemetryKey(const BSONObj& key, // { "$addFields" : { "x" : { "$someExpr" {} } } } ], // We should preserve the top-level stage names in the pipeline but apply hmac to all field // names of children. + + // TODO: SERVER-73152 literal and field name redaction for aggregate command. + if (!applyHmacToIdentifiers) { + return oldTelemetryKey; + } BSONObjBuilder hmacAppliedBuilder; - for (BSONElement e : key) { + for (BSONElement e : oldTelemetryKey) { if ((e.type() == Object || e.type() == Array) && kKeysToApplyHmac.count(e.fieldNameStringData().toString()) == 1) { auto hmacApplicator = [&](BSONObjBuilder subObj, const BSONObj& obj) { @@ -416,8 +419,7 @@ BSONObj TelemetryEntry::makeTelemetryKey(const BSONObj& key, hmacAppliedBuilder.append(e); } } - _hmacAppliedKey = hmacAppliedBuilder.obj(); - return *_hmacAppliedKey; + return hmacAppliedBuilder.obj(); } // The originating command/query does not persist through the end of query execution. In order to @@ -453,7 +455,7 @@ void registerAggRequest(const AggregateCommandRequest& request, OperationContext try { for (auto&& stage : request.getPipeline()) { BSONObjBuilder stageBuilder = pipelineBuilder.subobjStart("stage"_sd); - appendWithHmacAppliedLiterals(stageBuilder, stage.firstElement()); + appendWithAbstractedLiterals(stageBuilder, stage.firstElement()); stageBuilder.done(); } pipelineBuilder.done(); @@ -468,7 +470,9 @@ void registerAggRequest(const AggregateCommandRequest& request, OperationContext return; } - CurOp::get(opCtx)->debug().telemetryStoreKey = telemetryKey.obj(); + BSONObj key = telemetryKey.obj(); + CurOp::get(opCtx)->debug().telemetryStoreKeyHash = hash(key); + CurOp::get(opCtx)->debug().telemetryStoreKey = key.getOwned(); } void registerRequest(std::unique_ptr requestShapifier, @@ -490,8 +494,8 @@ void registerRequest(std::unique_ptr requestShapifier, SerializationOptions options; options.literalPolicy = LiteralSerializationPolicy::kToDebugTypeString; options.replacementForLiteralArgs = replacementForLiteralArgs; - CurOp::get(opCtx)->debug().telemetryStoreKey = - requestShapifier->makeTelemetryKey(options, expCtx); + CurOp::get(opCtx)->debug().telemetryStoreKeyHash = + hash(requestShapifier->makeTelemetryKey(options, expCtx)); CurOp::get(opCtx)->debug().telemetryRequestShapifier = std::move(requestShapifier); } @@ -504,26 +508,29 @@ TelemetryStore& getTelemetryStore(OperationContext* opCtx) { } void writeTelemetry(OperationContext* opCtx, + boost::optional telemetryKeyHash, boost::optional telemetryKey, std::unique_ptr requestShapifier, const uint64_t queryExecMicros, const uint64_t docsReturned) { - if (!telemetryKey) { + if (!telemetryKeyHash) { return; } auto&& telemetryStore = getTelemetryStore(opCtx); - auto&& [statusWithMetrics, partitionLock] = telemetryStore.getWithPartitionLock(*telemetryKey); + auto&& [statusWithMetrics, partitionLock] = + telemetryStore.getWithPartitionLock(*telemetryKeyHash); std::shared_ptr metrics; if (statusWithMetrics.isOK()) { metrics = *statusWithMetrics.getValue(); } else { + BSONObj key = telemetryKey.value_or(BSONObj{}); size_t numEvicted = - telemetryStore.put(*telemetryKey, - std::make_shared(std::move(requestShapifier), - CurOp::get(opCtx)->getNSS()), + telemetryStore.put(*telemetryKeyHash, + std::make_shared( + std::move(requestShapifier), CurOp::get(opCtx)->getNSS(), key), partitionLock); telemetryEvictedMetric.increment(numEvicted); - auto newMetrics = partitionLock->get(*telemetryKey); + auto newMetrics = partitionLock->get(*telemetryKeyHash); if (!newMetrics.isOK()) { // This can happen if the budget is immediately exceeded. Specifically if the there is // not enough room for a single new entry if the number of partitions is too high @@ -533,7 +540,7 @@ void writeTelemetry(OperationContext* opCtx, 1, "Failed to store telemetry entry.", "status"_attr = newMetrics.getStatus(), - "rawKey"_attr = redact(*telemetryKey)); + "telemetryKeyHash"_attr = telemetryKeyHash); return; } metrics = newMetrics.getValue()->second; diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h index 9d2fdaa7b5b..e7e0f3ccfd1 100644 --- a/src/mongo/db/query/telemetry.h +++ b/src/mongo/db/query/telemetry.h @@ -99,10 +99,13 @@ extern CounterMetric telemetryStoreSizeEstimateBytesMetric; // Used to aggregate the metrics for one telemetry key over all its executions. class TelemetryEntry { public: - TelemetryEntry(std::unique_ptr requestShapifier, NamespaceStringOrUUID nss) + TelemetryEntry(std::unique_ptr requestShapifier, + NamespaceStringOrUUID nss, + const BSONObj& cmdObj) : firstSeenTimestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0), requestShapifier(std::move(requestShapifier)), - nss(nss) { + nss(nss), + oldTelemetryKey(cmdObj.copy()) { telemetryStoreSizeEstimateBytesMetric.increment(sizeof(TelemetryEntry) + sizeof(BSONObj)); } @@ -123,10 +126,9 @@ public: /** * Redact a given telemetry key and set _keySize. */ - BSONObj makeTelemetryKey(const BSONObj& key, - bool applyHmacToIdentifiers, - std::string hmacKey, - OperationContext* opCtx) const; + BSONObj computeTelemetryKey(OperationContext* opCtx, + bool applyHmacToIdentifiers, + std::string hmacKey) const; /** * Timestamp for when this query shape was added to the store. Set on construction. @@ -151,35 +153,31 @@ public: NamespaceStringOrUUID nss; -private: - /** - * We cache the hmac applied key the first time it's computed. - */ - mutable boost::optional _hmacAppliedKey; + // TODO: SERVER-73152 remove oldTelemetryKey when RequestShapifier is used for agg. + BSONObj oldTelemetryKey; }; struct TelemetryPartitioner { // The partitioning function for use with the 'Partitioned' utility. - std::size_t operator()(const BSONObj& k, const std::size_t nPartitions) const { - return SimpleBSONObjComparator::Hasher()(k) % nPartitions; + std::size_t operator()(const std::size_t k, const std::size_t nPartitions) const { + return k % nPartitions; } }; struct TelemetryStoreEntryBudgetor { - size_t operator()(const BSONObj& key, const std::shared_ptr& value) { - // The buget estimator for pair in LRU cache accounts for size of value - // (TelemetryEntry) size of the key, and size of the key's underlying data struture - // (BSONObj). - return sizeof(TelemetryEntry) + sizeof(BSONObj) + key.objsize(); + size_t operator()(const std::size_t key, const std::shared_ptr& value) { + // The buget estimator for pair in LRU cache accounts for the size of the key + // and the size of the metrics, including the bson object used for generating the telemetry + // key at read time. + + return sizeof(TelemetryEntry) + sizeof(std::size_t) + value->oldTelemetryKey.objsize(); } }; -using TelemetryStore = PartitionedCache, TelemetryStoreEntryBudgetor, - TelemetryPartitioner, - SimpleBSONObjComparator::Hasher, - SimpleBSONObjComparator::EqualTo>; + TelemetryPartitioner>; /** * Acquire a reference to the global telemetry store. @@ -208,6 +206,7 @@ void registerRequest(std::unique_ptr requestShapifier, * Writes telemetry to the telemetry store for the operation identified by `telemetryKey`. */ void writeTelemetry(OperationContext* opCtx, + boost::optional telemetryKeyHash, boost::optional telemetryKey, std::unique_ptr requestShapifier, uint64_t queryExecMicros, diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp index cbf1104d29c..8d68ee566c6 100644 --- a/src/mongo/db/query/telemetry_store_test.cpp +++ b/src/mongo/db/query/telemetry_store_test.cpp @@ -46,6 +46,11 @@ namespace mongo::telemetry { std::string applyHmacForTest(StringData s) { return str::stream() << "HASH<" << s << ">"; } + +std::size_t hash(const BSONObj& obj) { + return absl::hash_internal::CityHash64(obj.objdata(), obj.objsize()); +} + class TelemetryStoreTest : public ServiceContextTest { public: BSONObj makeTelemetryKeyFindRequest( @@ -73,17 +78,18 @@ public: TEST_F(TelemetryStoreTest, BasicUsage) { TelemetryStore telStore{5000000, 1000}; - auto getMetrics = [&](BSONObj& key) { - auto lookupResult = telStore.lookup(key); + auto getMetrics = [&](const BSONObj& key) { + auto lookupResult = telStore.lookup(hash(key)); return *lookupResult.getValue(); }; auto collectMetrics = [&](BSONObj& key) { std::shared_ptr metrics; - auto lookupResult = telStore.lookup(key); + auto lookupResult = telStore.lookup(hash(key)); if (!lookupResult.isOK()) { - telStore.put(key, std::make_shared(nullptr, NamespaceString{})); - lookupResult = telStore.lookup(key); + telStore.put(hash(key), + std::make_shared(nullptr, NamespaceString{}, key)); + lookupResult = telStore.lookup(hash(key)); } metrics = *lookupResult.getValue(); metrics->execCount += 1; @@ -105,7 +111,7 @@ TEST_F(TelemetryStoreTest, BasicUsage) { ASSERT_EQ(getMetrics(query2)->execCount, 1); auto collectMetricsWithLock = [&](BSONObj& key) { - auto [lookupResult, lock] = telStore.getWithPartitionLock(key); + auto [lookupResult, lock] = telStore.getWithPartitionLock(hash(key)); auto metrics = *lookupResult.getValue(); metrics->execCount += 1; metrics->lastExecutionMicros += 123456; @@ -121,7 +127,7 @@ TEST_F(TelemetryStoreTest, BasicUsage) { int numKeys = 0; telStore.forEach( - [&](const BSONObj& key, const std::shared_ptr& entry) { numKeys++; }); + [&](std::size_t key, const std::shared_ptr& entry) { numKeys++; }); ASSERT_EQ(numKeys, 2); } @@ -135,13 +141,15 @@ TEST_F(TelemetryStoreTest, EvictEntries) { for (int i = 0; i < 20; i++) { auto query = BSON("query" + std::to_string(i) << 1 << "xEquals" << 42); - telStore.put(query, std::make_shared(nullptr, NamespaceString{})); + telStore.put(hash(query), + std::make_shared(nullptr, NamespaceString{}, BSONObj{})); } int numKeys = 0; telStore.forEach( - [&](const BSONObj& key, const std::shared_ptr& entry) { numKeys++; }); + [&](std::size_t key, const std::shared_ptr& entry) { numKeys++; }); - int entriesPerPartition = (cacheSize / numPartitions) / (46 + sizeof(TelemetryEntry)); + int entriesPerPartition = (cacheSize / numPartitions) / + (sizeof(std::size_t) + sizeof(TelemetryEntry) + BSONObj().objsize()); ASSERT_EQ(numKeys, entriesPerPartition * numPartitions); } @@ -683,33 +691,38 @@ TEST_F(TelemetryStoreTest, DefinesLetVariables) { const auto cmdObj = fcr.toBSON(BSON("$db" << "testDB")); TelemetryEntry testMetrics{std::make_unique(fcr, opCtx.get()), - fcr.getNamespaceOrUUID()}; + fcr.getNamespaceOrUUID(), + cmdObj}; bool applyHmacToIdentifiers = false; auto hmacApplied = - testMetrics.makeTelemetryKey(cmdObj, applyHmacToIdentifiers, std::string{}, opCtx.get()); - // As the query never moves through registerFindRequest and hmac is not enabled, - // makeTelemetryKey() never gets called and consequently the query never gets shapified. + testMetrics.computeTelemetryKey(opCtx.get(), applyHmacToIdentifiers, std::string{}); ASSERT_BSONOBJ_EQ_AUTO( // NOLINT R"({ - "find": "testColl", - "filter": { - "$expr": [ - { - "$eq": [ - "$a", - "$$var" - ] - } - ] - }, - "projection": { - "varIs": "$$var" - }, - "let": { - "var": 2 - }, - "$db": "testDB" + "queryShape": { + "cmdNs": { + "db": "testDB", + "coll": "testColl" + }, + "command": "find", + "filter": { + "$expr": [ + { + "$eq": [ + "$a", + "$$var" + ] + } + ] + }, + "let": { + "var": "?number" + }, + "projection": { + "varIs": "$$var", + "_id": true + } + } })", hmacApplied); @@ -717,7 +730,7 @@ TEST_F(TelemetryStoreTest, DefinesLetVariables) { // do the hashing, so we'll just stick with the big long strings here for now. applyHmacToIdentifiers = true; hmacApplied = - testMetrics.makeTelemetryKey(cmdObj, applyHmacToIdentifiers, std::string{}, opCtx.get()); + testMetrics.computeTelemetryKey(opCtx.get(), applyHmacToIdentifiers, std::string{}); ASSERT_BSONOBJ_EQ_AUTO( // NOLINT R"({ "queryShape": { diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 9cc44be6811..939637d0f32 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -75,6 +75,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, _lastUseDate(_createdDate), _queryHash(CurOp::get(opCtx)->debug().queryHash), _shouldOmitDiagnosticInformation(CurOp::get(opCtx)->debug().shouldOmitDiagnosticInformation), + _telemetryStoreKeyHash(CurOp::get(opCtx)->debug().telemetryStoreKeyHash), _telemetryStoreKey(CurOp::get(opCtx)->debug().telemetryStoreKey), _telemetryRequestShapifier(std::move(CurOp::get(opCtx)->debug().telemetryRequestShapifier)) { dassert(!_params.compareWholeSortKeyOnRouter || @@ -136,8 +137,9 @@ void ClusterClientCursorImpl::kill(OperationContext* opCtx) { "Cannot kill a cluster client cursor that has already been killed", !_hasBeenKilled); - if (_telemetryStoreKey && opCtx) { + if (_telemetryStoreKeyHash && opCtx) { telemetry::writeTelemetry(opCtx, + _telemetryStoreKeyHash, _telemetryStoreKey, std::move(_telemetryRequestShapifier), _metrics.executionTime.value_or(Microseconds{0}).count(), diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 8f23c25ff02..ecb7535715c 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -32,6 +32,7 @@ #include #include +#include "mongo/bson/bsonobj.h" #include "mongo/executor/task_executor.h" #include "mongo/s/query/cluster_client_cursor.h" #include "mongo/s/query/cluster_client_cursor_guard.h" @@ -185,6 +186,8 @@ private: bool _shouldOmitDiagnosticInformation = false; // If boost::none, telemetry should not be collected for this cursor. + boost::optional _telemetryStoreKeyHash; + // TODO: SERVER-73152 remove telemetryStoreKey when RequestShapifier is used for agg. boost::optional _telemetryStoreKey; // The RequestShapifier used by telemetry to shapify the request payload into the telemetry // store key. diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 1cfbd7bf700..d8e47e55ecf 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -598,6 +598,7 @@ void collectTelemetryMongos(OperationContext* opCtx, auto&& opDebug = CurOp::get(opCtx)->debug(); telemetry::writeTelemetry( opCtx, + opDebug.telemetryStoreKeyHash, opDebug.telemetryStoreKey, std::move(requestShapifier), opDebug.additiveMetrics.executionTime.value_or(Microseconds{0}).count(), -- cgit v1.2.1