/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/query/telemetry.h" #include "mongo/crypto/hash_block.h" #include "mongo/crypto/sha256_block.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/curop.h" #include "mongo/db/exec/projection_executor_builder.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/query/plan_explainer.h" #include "mongo/db/query/projection_ast_util.h" #include "mongo/db/query/projection_parser.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_planner_params.h" #include "mongo/db/query/query_request_helper.h" #include "mongo/db/query/rate_limiting.h" #include "mongo/db/query/serialization_options.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/query/telemetry_util.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/util/assert_util.h" #include "mongo/util/debug_util.h" #include "mongo/util/processinfo.h" #include "mongo/util/system_clock_source.h" #include "query_shape.h" #include #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery namespace mongo { namespace telemetry { /** * Redacts all BSONObj field names as if they were paths, unless the field name is a special hint * operator. */ namespace { boost::optional getApplicationName(const OperationContext* opCtx) { if (auto metadata = ClientMetadata::get(opCtx->getClient())) { return metadata->getApplicationName().toString(); } return boost::none; } } // namespace CounterMetric telemetryStoreSizeEstimateBytesMetric("telemetry.telemetryStoreSizeEstimateBytes"); namespace { CounterMetric telemetryEvictedMetric("telemetry.numEvicted"); CounterMetric telemetryRateLimitedRequestsMetric("telemetry.numRateLimitedRequests"); CounterMetric telemetryStoreWriteErrorsMetric("telemetry.numTelemetryStoreWriteErrors"); /** * Cap the telemetry store size. */ size_t capTelemetryStoreSize(size_t requestedSize) { size_t cappedStoreSize = memory_util::capMemorySize( requestedSize /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); // If capped size is less than requested size, the telemetry store has been capped at its // upper limit. if (cappedStoreSize < requestedSize) { LOGV2_DEBUG(7106502, 1, "The telemetry store size has been capped", "cappedSize"_attr = cappedStoreSize); } return cappedStoreSize; } /** * Get the telemetry store size based on the query job's value. */ size_t getTelemetryStoreSize() { auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get()); uassertStatusOK(status); size_t requestedSize = memory_util::convertToSizeInBytes(status.getValue()); return capTelemetryStoreSize(requestedSize); } /** * A manager for the telemetry store allows a "pointer swap" on the telemetry store itself. The * usage patterns are as follows: * * - Updating the telemetry store uses the `getTelemetryStore()` method. The telemetry store * instance is obtained, entries are looked up and mutated, or created anew. * - The telemetry store is "reset". This involves atomically allocating a new instance, once * there are no more updaters (readers of the store "pointer"), and returning the existing * instance. */ class TelemetryStoreManager { public: template TelemetryStoreManager(size_t cacheSize, size_t numPartitions) : _telemetryStore(std::make_unique(cacheSize, numPartitions)), _maxSize(cacheSize) {} /** * Acquire the instance of the telemetry store. */ TelemetryStore& getTelemetryStore() { return *_telemetryStore; } size_t getMaxSize() { return _maxSize; } /** * Resize the telemetry store and return the number of evicted * entries. */ size_t resetSize(size_t cacheSize) { _maxSize = cacheSize; return _telemetryStore->reset(cacheSize); } private: std::unique_ptr _telemetryStore; /** * Max size of the telemetry store. Tracked here to avoid having to recompute after it's divided * up into partitions. */ size_t _maxSize; }; const auto telemetryStoreDecoration = ServiceContext::declareDecoration>(); const auto telemetryRateLimiter = ServiceContext::declareDecoration>(); class TelemetryOnParamChangeUpdaterImpl final : public telemetry_util::OnParamChangeUpdater { public: void updateCacheSize(ServiceContext* serviceCtx, memory_util::MemorySize memSize) final { auto requestedSize = memory_util::convertToSizeInBytes(memSize); auto cappedSize = capTelemetryStoreSize(requestedSize); auto& telemetryStoreManager = telemetryStoreDecoration(serviceCtx); size_t numEvicted = telemetryStoreManager->resetSize(cappedSize); telemetryEvictedMetric.increment(numEvicted); } void updateSamplingRate(ServiceContext* serviceCtx, int samplingRate) { telemetryRateLimiter(serviceCtx).get()->setSamplingRate(samplingRate); } }; ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ "TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) { // It is possible that this is called before FCV is properly set up. Setting up the store if // the flag is enabled but FCV is incorrect is safe, and guards against the FCV being // changed to a supported version later. if (!feature_flags::gFeatureFlagTelemetry.isEnabledAndIgnoreFCVUnsafeAtStartup()) { // featureFlags are not allowed to be changed at runtime. Therefore it's not an issue // to not create a telemetry store in ConstructorActionRegisterer at start up with the // flag off - because the flag can not be turned on at any point afterwards. telemetry_util::telemetryStoreOnParamChangeUpdater(serviceCtx) = std::make_unique(); return; } telemetry_util::telemetryStoreOnParamChangeUpdater(serviceCtx) = std::make_unique(); size_t size = getTelemetryStoreSize(); auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx); // The plan cache and telemetry store should use the same number of partitions. // That is, the number of cpu cores. size_t numPartitions = ProcessInfo::getNumCores(); size_t partitionBytes = size / numPartitions; size_t metricsSize = sizeof(TelemetryEntry); if (partitionBytes < metricsSize * 10) { numPartitions = size / metricsSize; if (numPartitions < 1) { numPartitions = 1; } } globalTelemetryStoreManager = std::make_unique(size, numPartitions); auto configuredSamplingRate = queryTelemetrySamplingRate.load(); telemetryRateLimiter(serviceCtx) = std::make_unique( configuredSamplingRate < 0 ? INT_MAX : configuredSamplingRate); }}; /** * Top-level checks for whether telemetry collection is enabled. If this returns false, we must go * no further. */ bool isTelemetryEnabled(const ServiceContext* serviceCtx) { // During initialization FCV may not yet be setup but queries could be run. We can't // check whether telemetry should be enabled without FCV, so default to not recording // those queries. // TODO SERVER-75935 Remove FCV Check. return feature_flags::gFeatureFlagTelemetry.isEnabled( serverGlobalParams.featureCompatibility) && telemetryStoreDecoration(serviceCtx)->getMaxSize() > 0; } /** * Internal check for whether we should collect metrics. This checks the rate limiting * configuration for a global on/off decision and, if enabled, delegates to the rate limiter. */ bool shouldCollect(const ServiceContext* serviceCtx) { // Quick escape if telemetry is turned off. if (!isTelemetryEnabled(serviceCtx)) { return false; } // Cannot collect telemetry if sampling rate is not greater than 0. Note that we do not // increment telemetryRateLimitedRequestsMetric here since telemetry is entirely disabled. if (telemetryRateLimiter(serviceCtx)->getSamplingRate() <= 0) { return false; } // Check if rate limiting allows us to collect telemetry for this request. if (telemetryRateLimiter(serviceCtx)->getSamplingRate() < INT_MAX && !telemetryRateLimiter(serviceCtx)->handleRequestSlidingWindow()) { telemetryRateLimitedRequestsMetric.increment(); return false; } return true; } /** * Add a field to the find op's telemetry key. The `value` will have hmac applied. */ void addToFindKey(BSONObjBuilder& builder, const StringData& fieldName, const BSONObj& value) { serializeBSONWhenNotEmpty(value.redact(false), fieldName, &builder); } /** * Recognize FLE payloads in a query and throw an exception if found. */ void throwIfEncounteringFLEPayload(const BSONElement& e) { constexpr auto safeContentLabel = "__safeContent__"_sd; constexpr auto fieldpath = "$__safeContent__"_sd; if (e.type() == BSONType::Object) { auto fieldname = e.fieldNameStringData(); uassert(ErrorCodes::EncounteredFLEPayloadWhileApplyingHmac, "Encountered __safeContent__, or an $_internalFle operator, which indicate a " "rewritten FLE2 query.", fieldname != safeContentLabel && !fieldname.startsWith("$_internalFle"_sd)); } else if (e.type() == BSONType::String) { auto val = e.valueStringData(); uassert(ErrorCodes::EncounteredFLEPayloadWhileApplyingHmac, "Encountered $__safeContent__ fieldpath, which indicates a rewritten FLE2 query.", val != fieldpath); } else if (e.type() == BSONType::BinData && e.isBinData(BinDataType::Encrypt)) { int len; auto data = e.binData(len); uassert(ErrorCodes::EncounteredFLEPayloadWhileApplyingHmac, "FLE1 Payload encountered in expression.", len > 1 && data[1] != char(EncryptedBinDataType::kDeterministic)); } } /** * Upon reading telemetry data, we apply hmac to some keys. This is the list. See * TelemetryEntry::makeTelemetryKey(). */ const stdx::unordered_set kKeysToApplyHmac = {"pipeline", "find"}; std::string sha256HmacStringDataHasher(std::string key, const StringData& sd) { auto hashed = SHA256Block::computeHmac( (const uint8_t*)key.data(), key.size(), (const uint8_t*)sd.rawData(), sd.size()); return hashed.toString(); } std::string sha256HmacFieldNameHasher(std::string key, const BSONElement& e) { auto&& fieldName = e.fieldNameStringData(); return sha256HmacStringDataHasher(key, fieldName); } std::string constantFieldNameHasher(const BSONElement& e) { return {"###"}; } /** * Admittedly an abuse of the BSON redaction interface, we recognize FLE payloads here and avoid * collecting telemetry for the query. */ std::string fleSafeFieldNameRedactor(const BSONElement& e) { throwIfEncounteringFLEPayload(e); // Ideally we would change interface to avoid copying here. return e.fieldNameStringData().toString(); } /** * Append the element to the builder and apply hmac to any literals within the element. The element * may be of any type. */ void appendWithAbstractedLiterals(BSONObjBuilder& builder, const BSONElement& el) { if (el.type() == Object) { builder.append(el.fieldNameStringData(), el.Obj().redact(false, fleSafeFieldNameRedactor)); } else if (el.type() == Array) { BSONObjBuilder arrayBuilder = builder.subarrayStart(fleSafeFieldNameRedactor(el)); for (auto&& arrayElem : el.Obj()) { appendWithAbstractedLiterals(arrayBuilder, arrayElem); } arrayBuilder.done(); } else { auto fieldName = fleSafeFieldNameRedactor(el); builder.append(fieldName, "###"_sd); } } static const StringData replacementForLiteralArgs = "?"_sd; std::size_t hash(const BSONObj& obj) { return absl::hash_internal::CityHash64(obj.objdata(), obj.objsize()); } } // namespace BSONObj TelemetryEntry::computeTelemetryKey(OperationContext* opCtx, bool applyHmacToIdentifiers, std::string hmacKey) const { // The telemetry key for find queries is generated by serializing all the command fields // and applying hmac if SerializationOptions indicate to do so. The resulting key is of the // form: // { // queryShape: { // cmdNs: {db: "...", coll: "..."}, // find: "...", // filter: {"...": {"$eq": "?number"}}, // }, // applicationName: kHashedApplicationName // } // queryShape may include additional fields, eg hint, limit sort, etc, depending on the original // query. // TODO SERVER-73152 incorporate aggregation request into same path so that nullptr check is // unnecessary if (requestShapifier != nullptr) { auto serializationOpts = applyHmacToIdentifiers ? SerializationOptions( [&](StringData sd) { return sha256HmacStringDataHasher(hmacKey, sd); }, LiteralSerializationPolicy::kToDebugTypeString) : SerializationOptions(LiteralSerializationPolicy::kToDebugTypeString); return requestShapifier->makeTelemetryKey(serializationOpts, opCtx); } // TODO SERVER-73152 remove all special aggregation logic below // The telemetry key for agg queries is of the following form: // { "agg": {...}, "namespace": "...", "applicationName": "...", ... } // // The part of the key we need to apply hmac to is the object in the element. In the // case of an aggregate() command, it will look something like: > "pipeline" : [ { "$telemetry" // : {} }, // { "$addFields" : { "x" : { "$someExpr" {} } } } ], // We should preserve the top-level stage names in the pipeline but apply hmac to all field // names of children. // TODO: SERVER-73152 literal and field name redaction for aggregate command. if (!applyHmacToIdentifiers) { return oldTelemetryKey; } BSONObjBuilder hmacAppliedBuilder; for (BSONElement e : oldTelemetryKey) { if ((e.type() == Object || e.type() == Array) && kKeysToApplyHmac.count(e.fieldNameStringData().toString()) == 1) { auto hmacApplicator = [&](BSONObjBuilder subObj, const BSONObj& obj) { for (BSONElement e2 : obj) { if (e2.type() == Object) { subObj.append(e2.fieldNameStringData(), e2.Obj().redact(false, [&](const BSONElement& e) { return sha256HmacFieldNameHasher(hmacKey, e); })); } else { subObj.append(e2); } } subObj.done(); }; // Now we're inside the :{} entry and want to preserve the top-level field // names. If it's a [pipeline] array, we redact each element in isolation. if (e.type() == Object) { hmacApplicator(hmacAppliedBuilder.subobjStart(e.fieldNameStringData()), e.Obj()); } else { BSONObjBuilder subArr = hmacAppliedBuilder.subarrayStart(e.fieldNameStringData()); for (BSONElement stage : e.Obj()) { hmacApplicator(subArr.subobjStart(""), stage.Obj()); } } } else { hmacAppliedBuilder.append(e); } } return hmacAppliedBuilder.obj(); } // The originating command/query does not persist through the end of query execution. In order to // pair the telemetry metrics that are collected at the end of execution with the original query, it // is necessary to register the original query during planning and persist it after // execution. // During planning, registerRequest is called to serialize the query shape and context (together, // the telemetry context) and save it to OpDebug. Moreover, as query execution may span more than // one request/operation and OpDebug does not persist through cursor iteration, it is necessary to // communicate the telemetry context across operations. In this way, the telemetry context is // registered to the cursor, so upon getMore() calls, the cursor manager passes the telemetry key // from the pinned cursor to the new OpDebug. // Once query execution is complete, the telemetry context is grabbed from OpDebug, a telemetry key // is generated from this and metrics are paired to this key in the telemetry store. void registerAggRequest(const AggregateCommandRequest& request, OperationContext* opCtx) { if (!isTelemetryEnabled(opCtx->getServiceContext())) { return; } // Queries against metadata collections should never appear in telemetry data. if (request.getNamespace().isFLE2StateCollection()) { return; } if (!shouldCollect(opCtx->getServiceContext())) { return; } BSONObjBuilder telemetryKey; BSONObjBuilder pipelineBuilder = telemetryKey.subarrayStart("pipeline"_sd); try { for (auto&& stage : request.getPipeline()) { BSONObjBuilder stageBuilder = pipelineBuilder.subobjStart("stage"_sd); appendWithAbstractedLiterals(stageBuilder, stage.firstElement()); stageBuilder.done(); } pipelineBuilder.done(); telemetryKey.append("namespace", request.getNamespace().toString()); if (request.getReadConcern()) { telemetryKey.append("readConcern", *request.getReadConcern()); } if (auto metadata = ClientMetadata::get(opCtx->getClient())) { telemetryKey.append("applicationName", metadata->getApplicationName()); } } catch (ExceptionFor&) { return; } BSONObj key = telemetryKey.obj(); CurOp::get(opCtx)->debug().telemetryStoreKeyHash = hash(key); CurOp::get(opCtx)->debug().telemetryStoreKey = key.getOwned(); } void registerRequest(std::unique_ptr requestShapifier, const NamespaceString& collection, OperationContext* opCtx, const boost::intrusive_ptr& expCtx) { if (!isTelemetryEnabled(opCtx->getServiceContext())) { return; } // Queries against metadata collections should never appear in telemetry data. if (collection.isFLE2StateCollection()) { return; } if (!shouldCollect(opCtx->getServiceContext())) { return; } SerializationOptions options; options.literalPolicy = LiteralSerializationPolicy::kToDebugTypeString; options.replacementForLiteralArgs = replacementForLiteralArgs; CurOp::get(opCtx)->debug().telemetryStoreKeyHash = hash(requestShapifier->makeTelemetryKey(options, expCtx)); CurOp::get(opCtx)->debug().telemetryRequestShapifier = std::move(requestShapifier); } TelemetryStore& getTelemetryStore(OperationContext* opCtx) { uassert(6579000, "Telemetry is not enabled without the feature flag on and a cache size greater than 0 " "bytes", isTelemetryEnabled(opCtx->getServiceContext())); return telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore(); } void writeTelemetry(OperationContext* opCtx, boost::optional telemetryKeyHash, boost::optional telemetryKey, std::unique_ptr requestShapifier, const uint64_t queryExecMicros, const uint64_t docsReturned) { if (!telemetryKeyHash) { return; } auto&& telemetryStore = getTelemetryStore(opCtx); auto&& [statusWithMetrics, partitionLock] = telemetryStore.getWithPartitionLock(*telemetryKeyHash); std::shared_ptr metrics; if (statusWithMetrics.isOK()) { metrics = *statusWithMetrics.getValue(); } else { BSONObj key = telemetryKey.value_or(BSONObj{}); size_t numEvicted = telemetryStore.put(*telemetryKeyHash, std::make_shared( std::move(requestShapifier), CurOp::get(opCtx)->getNSS(), key), partitionLock); telemetryEvictedMetric.increment(numEvicted); auto newMetrics = partitionLock->get(*telemetryKeyHash); if (!newMetrics.isOK()) { // This can happen if the budget is immediately exceeded. Specifically if the there is // not enough room for a single new entry if the number of partitions is too high // relative to the size. telemetryStoreWriteErrorsMetric.increment(); LOGV2_DEBUG(7560900, 1, "Failed to store telemetry entry.", "status"_attr = newMetrics.getStatus(), "telemetryKeyHash"_attr = telemetryKeyHash); return; } metrics = newMetrics.getValue()->second; } metrics->lastExecutionMicros = queryExecMicros; metrics->execCount++; metrics->queryExecMicros.aggregate(queryExecMicros); metrics->docsReturned.aggregate(docsReturned); } } // namespace telemetry } // namespace mongo