diff options
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/telemetry.cpp | 167 | ||||
-rw-r--r-- | src/mongo/db/query/telemetry.h | 28 |
7 files changed, 139 insertions, 110 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 64e75bc738f..66e9eae26d8 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -198,6 +198,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/query/command_request_response', + '$BUILD_DIR/mongo/db/query/telemetry', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/timer_stats', '$BUILD_DIR/mongo/db/storage/storage_engine_parameters', diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 5b623ca7daa..13cf5cff30c 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -566,6 +566,11 @@ public: cq->setUseCqfIfEligible(true); + if (collection) { + telemetry::registerFindRequest( + cq->getFindCommandRequest(), collection.get()->ns(), opCtx); + } + // Get the execution plan for the query. bool permitYield = true; auto exec = @@ -723,17 +728,7 @@ public: query_request_helper::validateCursorResponse(result->getBodyBuilder().asTempObj(), nss.tenantId()); - auto telemetryKey = - telemetry::shouldCollectTelemetry(originalFC, collection.get()->ns(), opCtx); - - // FLE2 queries should not be included in telemetry, so make sure that we did not - // rewrite this query before collecting telemetry. - if (telemetryKey && !_didDoFLERewrite) { - opCtx->storeQueryBSON(*telemetryKey); - - telemetry::collectTelemetry( - opCtx->getServiceContext(), *telemetryKey, CurOp::get(opCtx)->debug(), true); - } + telemetry::recordExecution(opCtx, CurOp::get(opCtx)->debug(), _didDoFLERewrite); } void appendMirrorableRequest(BSONObjBuilder* bob) const override { diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index e5f62335112..7f7dc01b6fa 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -561,6 +561,8 @@ public: exec->reattachToOperationContext(opCtx); exec->restoreState(readLock ? &readLock->getCollection() : nullptr); + telemetry::registerGetMoreRequest(opCtx, exec->getPlanExplainer()); + auto planSummary = exec->getPlanExplainer().getPlanSummary(); { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -693,15 +695,6 @@ public: // response batch. curOp->debug().nreturned = numResults; - auto telemetryKey = telemetry::shouldCollectTelemetry( - opCtx, exec->getPlanExplainer().getTelemetryKey()); - if (telemetryKey) { - telemetry::collectTelemetry(opCtx->getServiceContext(), - exec->getPlanExplainer().getTelemetryKey(), - curOp->debug(), - false); - } - if (respondWithId) { cursorDeleter.dismiss(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 493c9a9544c..fc6c914e603 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -725,7 +725,8 @@ Status runAggregate(OperationContext* opCtx, boost::intrusive_ptr<ExpressionContext> expCtx; auto curOp = CurOp::get(opCtx); auto catalog = CollectionCatalog::get(opCtx); - boost::optional<BSONObj> telemetryKey; + + telemetry::registerAggRequest(request, opCtx); // Since we remove encryptionInformation after rewriting a FLE2 query, this boolean keeps track // of whether the input query did originally have enryption information. @@ -1101,22 +1102,7 @@ Status runAggregate(OperationContext* opCtx, curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; - // FLE2 queries should not be included in telemetry, so make sure that we did not - // rewrite this query before collecting telemetry. - if (!didDoFLERewrite) { - telemetryKey = telemetry::shouldCollectTelemetry(request, opCtx); - // Build the telemetry key and store it in the operation context - if (telemetryKey) { - // TODO SERVER-71315: should we store it in the CurOp instead? (or even - // PlanExplainer) - opCtx->storeQueryBSON(*telemetryKey); - } - - if (telemetryKey) { - telemetry::collectTelemetry( - opCtx->getServiceContext(), *telemetryKey, curOp->debug(), true); - } - } + telemetry::recordExecution(opCtx, curOp->debug(), didDoFLERewrite); // For an optimized away pipeline, signal the cache that a query operation has completed. // For normal pipelines this is done in DocumentSourceCursor. diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 63c2ac88dcb..f961ab83de4 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -46,6 +46,8 @@ #include "mongo/db/profile_filter.h" #include "mongo/db/query/getmore_command_gen.h" #include "mongo/db/query/plan_summary_stats.h" +#include "mongo/db/query/telemetry.h" +#include "mongo/db/stats/timer_stats.h" #include "mongo/db/storage/storage_engine_parameters_gen.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" @@ -57,7 +59,6 @@ #include "mongo/util/net/socket_utils.h" #include "mongo/util/str.h" #include "mongo/util/system_tick_source.h" -#include <mongo/db/stats/timer_stats.h> #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault @@ -435,6 +436,8 @@ bool CurOp::completeAndLogOperation(OperationContext* opCtx, _debug.executionTime = duration_cast<Microseconds>(elapsedTimeExcludingPauses()); const auto executionTimeMillis = durationCount<Milliseconds>(_debug.executionTime); + telemetry::collectTelemetry(opCtx, CurOp::get(opCtx)->debug()); + if (_debug.isReplOplogGetMore) { oplogGetMoreStats.recordMillis(executionTimeMillis); } diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index 43f436aa920..7af325183f1 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -43,7 +43,6 @@ #include "mongo/db/query/telemetry_util.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" -#include "mongo/util/processinfo.h" #include "mongo/util/system_clock_source.h" #include <cstddef> @@ -140,8 +139,8 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ std::make_unique<TelemetryOnParamChangeUpdaterImpl>(); auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get()); uassertStatusOK(status); - auto size = memory_util::getRequestedMemSizeInBytes(status.getValue()); - auto cappedStoreSize = memory_util::capMemorySize( + size_t size = memory_util::getRequestedMemSizeInBytes(status.getValue()); + size_t cappedStoreSize = memory_util::capMemorySize( size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); // If capped size is less than requested size, the telemetry store has been capped at its // upper limit. @@ -152,15 +151,18 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ "cappedSize"_attr = cappedStoreSize); } auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx); - const int kNumPartitions = 100; // the more the merrier. + // Many partitions reduces lock contention on both reading and write telemetry data. + size_t numPartitions = 1024; + size_t partitionBytes = cappedStoreSize / numPartitions; + size_t metricsSize = sizeof(TelemetryMetrics); + if (partitionBytes < metricsSize * 10) { + numPartitions = cappedStoreSize / metricsSize; + if (numPartitions < 1) { + numPartitions = 1; + } + } globalTelemetryStoreManager = - std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, kNumPartitions); - // TODO there will be a rate limiter initialized somewhere, and we can get the value from - // there to save a .load(). We need the rate limiter to do rate limiting here anyway. int - // samplingRate = queryTelemetrySamplingRate.load(); Quick escape if it's turned off? if - // (!samplingRate) { - // return; - //} + std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, numPartitions); telemetryRateLimiter(serviceCtx) = std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load()); }}; @@ -178,7 +180,7 @@ bool shouldCollect(const ServiceContext* serviceCtx) { if (!isTelemetryEnabled(serviceCtx)) { return false; } - // Check if rate limiting allows us to accumulate. + // Check if rate limiting allows us to collect telemetry for this request. if (!telemetryRateLimiter(serviceCtx)->handleRequestSlidingWindow()) { return false; } @@ -217,21 +219,64 @@ void throwIfEncounteringFLEPayload(BSONElement e) { } } +/** + * Get the metrics for a given key holding the appropriate locks. + */ +class LockedMetrics { + LockedMetrics(TelemetryMetrics* metrics, + Lock::ResourceLock telemetryStoreReadLock, + TelemetryStore::Partition partitionLock) + : _metrics(metrics), + _telemetryStoreReadLock(std::move(telemetryStoreReadLock)), + _partitionLock(std::move(partitionLock)) {} + +public: + static LockedMetrics get(const OperationContext* opCtx, const BSONObj& telemetryKey) { + auto&& [telemetryStore, telemetryStoreReadLock] = + getTelemetryStoreForRead(opCtx->getServiceContext()); + auto&& [statusWithMetrics, partitionLock] = + telemetryStore->getWithPartitionLock(telemetryKey); + TelemetryMetrics* metrics; + if (statusWithMetrics.isOK()) { + metrics = statusWithMetrics.getValue(); + } else { + telemetryStore->put(telemetryKey, {}, partitionLock); + auto newMetrics = partitionLock->get(telemetryKey); + // This can happen if the budget is immediately exceeded. Specifically if the there is + // not enough room for a single new entry if the number of partitions is too high + // relative to the size. + tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); + metrics = &newMetrics.getValue()->second; + } + return LockedMetrics{metrics, std::move(telemetryStoreReadLock), std::move(partitionLock)}; + } + + TelemetryMetrics* operator->() const { + return _metrics; + } + +private: + TelemetryMetrics* _metrics; + + Lock::ResourceLock _telemetryStoreReadLock; + + TelemetryStore::Partition _partitionLock; +}; + } // namespace -boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request, - const OperationContext* opCtx) { +void registerAggRequest(const AggregateCommandRequest& request, OperationContext* opCtx) { if (request.getEncryptionInformation()) { - return {}; + return; } // Queries against metadata collections should never appear in telemetry data. if (request.getNamespace().isFLE2StateCollection()) { - return {}; + return; } if (!shouldCollect(opCtx->getServiceContext())) { - return {}; + return; } BSONObjBuilder telemetryKey; @@ -252,25 +297,41 @@ boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& r telemetryKey.append("applicationName", metadata->getApplicationName()); } } catch (ExceptionFor<ErrorCodes::EncounteredFLEPayloadWhileRedacting>&) { - return {}; + return; } - return {telemetryKey.obj()}; + opCtx->storeQueryBSON(telemetryKey.obj()); + // Management of the telemetry key works as follows. + // + // Query execution potentially spans more than one request/operation. For this reason, we need a + // mechanism to communicate the context (the telemetry key) across operations on the same query. + // In order to accomplish this, we store the telemetry key in the plan explainer which exists + // for the entire life of the query. + // + // - Telemetry key must be stored in the OperationContext before the PlanExecutor is created. + // This is accomplished by calling registerXXXRequest() in run_aggregate.cpp and + // find_cmd.cpp before the PlanExecutor is created. + // + // - During collectTelemetry(), the telemetry key is retrieved from the OperationContext to + // write metrics into the telemetry store. This is done at the end of the operation. + // + // - Upon getMore() calls, registerGetMoreRequest() copy the telemetry key from the + // PlanExplainer to the OperationContext. } -boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request, - const NamespaceString& collection, - const OperationContext* opCtx) { +void registerFindRequest(const FindCommandRequest& request, + const NamespaceString& collection, + OperationContext* opCtx) { if (request.getEncryptionInformation()) { - return {}; + return; } // Queries against metadata collections should never appear in telemetry data. if (collection.isFLE2StateCollection()) { - return {}; + return; } if (!shouldCollect(opCtx->getServiceContext())) { - return {}; + return; } BSONObjBuilder telemetryKey; @@ -293,17 +354,17 @@ boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& reques telemetryKey.append("applicationName", metadata->getApplicationName()); } } catch (ExceptionFor<ErrorCodes::EncounteredFLEPayloadWhileRedacting>&) { - return {}; + return; } - return {telemetryKey.obj()}; + opCtx->storeQueryBSON(telemetryKey.obj()); } -boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx, - const BSONObj& telemetryKey) { +void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planExplainer) { + auto&& telemetryKey = planExplainer.getTelemetryKey(); if (telemetryKey.isEmpty() || !shouldCollect(opCtx->getServiceContext())) { - return {}; + return; } - return {telemetryKey}; + opCtx->storeQueryBSON(telemetryKey); } std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead( @@ -315,33 +376,25 @@ std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* servic return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore(); } -void collectTelemetry(const ServiceContext* serviceCtx, - const BSONObj& key, - const OpDebug& opDebug, - bool isExec) { - auto&& getTelemetryStoreResult = getTelemetryStoreForRead(serviceCtx); - auto telemetryStore = getTelemetryStoreResult.first; - auto&& result = telemetryStore->getWithPartitionLock(key); - auto statusWithMetrics = result.first; - auto partitionLock = std::move(result.second); - auto metrics = [&]() { - if (statusWithMetrics.isOK()) { - return statusWithMetrics.getValue(); - } else { - TelemetryMetrics metrics; - telemetryStore->put(key, metrics, partitionLock); - auto newMetrics = partitionLock->get(key); - // This can happen if the budget is immediately exceeded. Specifically if the there is - // not enough room for a single new entry if the number of partitions is too high - // relative to the size. - tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); - return &newMetrics.getValue()->second; - } - }(); - if (isExec) { - metrics->execCount++; - metrics->queryOptMicros.aggregate(opDebug.planningTime.count()); +void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle) { + if (isFle) { + return; + } + auto&& telemetryKey = opCtx->getTelemetryKey(); + if (telemetryKey.isEmpty()) { + return; + } + auto&& metrics = LockedMetrics::get(opCtx, telemetryKey); + metrics->execCount++; + metrics->queryOptMicros.aggregate(opDebug.planningTime.count()); +} + +void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug) { + auto&& telemetryKey = opCtx->getTelemetryKey(); + if (telemetryKey.isEmpty()) { + return; } + auto&& metrics = LockedMetrics::get(opCtx, telemetryKey); metrics->docsReturned.aggregate(opDebug.nreturned); metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0)); metrics->keysScanned.aggregate(opDebug.additiveMetrics.keysExamined.value_or(0)); diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h index 6fd59884ac4..3a2f41a7e4e 100644 --- a/src/mongo/db/query/telemetry.h +++ b/src/mongo/db/query/telemetry.h @@ -163,33 +163,31 @@ std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* servic bool isTelemetryEnabled(const ServiceContext* serviceCtx); /** - * Should we collect telemetry for a request? The decision is made based on the feature flag and - * telemetry parameters such as rate limiting. + * Register a request for telemetry collection. The telemetry machinery may decide not to collect + * anything but this should be called for all requests. The decision is made based on the feature + * flag and telemetry parameters such as rate limiting. * - * If the return value is a telemetry key in the form of BSONObj, this indicates the telemetry - * should be collected. Otherwise, telemetry should not be collected. + * The caller is still responsible for subsequently calling collectTelemetry() once the request is + * completed. * * Note that calling this affects internal state. It should be called once for each request for * which telemetry may be collected. */ -boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request, - const OperationContext* opCtx); +void registerAggRequest(const AggregateCommandRequest& request, OperationContext* opCtx); -boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request, - const NamespaceString& collection, - const OperationContext* opCtx); +void registerFindRequest(const FindCommandRequest& request, + const NamespaceString& collection, + OperationContext* opCtx); -boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx, - const BSONObj& telemetryKey); +void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planExplainer); + +void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle); /** * Collect telemetry for the operation identified by `key`. The `isExec` flag should be set if it's * the beginning of execution (first batch) of results and not set for subsequent getMore() calls. */ -void collectTelemetry(const ServiceContext* serviceCtx, - const BSONObj& key, - const OpDebug& opDebug, - bool isExec); +void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug); } // namespace telemetry } // namespace mongo |