diff options
Diffstat (limited to 'src/mongo/db/query/telemetry.cpp')
-rw-r--r-- | src/mongo/db/query/telemetry.cpp | 138 |
1 files changed, 69 insertions, 69 deletions
diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index 05b9a46b351..22c0dbd506f 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -32,7 +32,6 @@ #include "mongo/crypto/hash_block.h" #include "mongo/crypto/sha256_block.h" #include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/curop.h" #include "mongo/db/pipeline/aggregate_command_gen.h" @@ -44,7 +43,6 @@ #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/util/system_clock_source.h" -#include <cstddef> #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -55,6 +53,33 @@ namespace telemetry { namespace { /** + * Cap the telemetry store size. + */ +size_t capTelemetryStoreSize(size_t requestedSize) { + size_t cappedStoreSize = memory_util::capMemorySize( + requestedSize /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); + // If capped size is less than requested size, the telemetry store has been capped at its + // upper limit. + if (cappedStoreSize < requestedSize) { + LOGV2_DEBUG(7106502, + 1, + "The telemetry store size has been capped", + "cappedSize"_attr = cappedStoreSize); + } + return cappedStoreSize; +} + +/** + * Get the telemetry store size based on the query job's value. + */ +size_t getTelemetryStoreSize() { + auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get()); + uassertStatusOK(status); + size_t requestedSize = memory_util::getRequestedMemSizeInBytes(status.getValue()); + return capTelemetryStoreSize(requestedSize); +} + +/** * A manager for the telemetry store allows a "pointer swap" on the telemetry store itself. The * usage patterns are as follows: * @@ -67,19 +92,18 @@ namespace { class TelemetryStoreManager { public: template <typename... TelemetryStoreArgs> - TelemetryStoreManager(ServiceContext* serviceContext, TelemetryStoreArgs... args) + TelemetryStoreManager(TelemetryStoreArgs... args) : _telemetryStore( - std::make_unique<TelemetryStore>(std::forward<TelemetryStoreArgs>(args)...)), - _instanceLock(LockerImpl{serviceContext}), - _instanceMutex("TelemetryStoreManager") {} + std::make_unique<TelemetryStore>(std::forward<TelemetryStoreArgs>(args)...)) {} /** * Acquire the instance of the telemetry store. The telemetry store is mutable and a shared * "read lock" is obtained on the instance. That is, the telemetry store instance will not * be replaced. */ - std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStore() { - return std::make_pair(&*_telemetryStore, Lock::SharedLock{&_instanceLock, _instanceMutex}); + SharedTelemetryStore getTelemetryStore(OperationContext* opCtx) { + Lock::SharedLock readLock = {opCtx, _instanceMutex}; + return {_telemetryStore.get(), std::move(readLock)}; } /** @@ -87,82 +111,58 @@ public: * internal instance with a new instance. This operation acquires an exclusive "write lock" * which waits for all read locks to be released before replacing the instance. */ - std::unique_ptr<TelemetryStore> resetTelemetryStore() { - Lock::ExclusiveLock writeLock{&_instanceLock, _instanceMutex}; - auto newStore = std::make_unique<TelemetryStore>(_telemetryStore->size(), - _telemetryStore->numPartitions()); + std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx) { + size_t size = getTelemetryStoreSize(); + Lock::ExclusiveLock writeLock{opCtx, _instanceMutex}; + auto newStore = std::make_unique<TelemetryStore>(size, _telemetryStore->numPartitions()); std::swap(_telemetryStore, newStore); return newStore; // which is now the old store. } private: - std::unique_ptr<TelemetryStore> _telemetryStore; + Lock::ResourceMutex _instanceMutex{"TelemetryStoreManager"}; - /** - * Lock over the telemetry store. - */ - LockerImpl _instanceLock; - - Lock::ResourceMutex _instanceMutex; + std::unique_ptr<TelemetryStore> _telemetryStore; }; const auto telemetryStoreDecoration = ServiceContext::declareDecoration<std::unique_ptr<TelemetryStoreManager>>(); +const auto telemetryRateLimiter = + ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>(); + class TelemetryOnParamChangeUpdaterImpl final : public telemetry_util::OnParamChangeUpdater { public: void updateCacheSize(ServiceContext* serviceCtx, memory_util::MemorySize memSize) final { - auto newSizeBytes = memory_util::getRequestedMemSizeInBytes(memSize); - auto cappedSize = memory_util::capMemorySize( - newSizeBytes /*requestedSize*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); - - /* If capped size is less than requested size, the telemetry store has been capped at - * its upper limit*/ - if (cappedSize < newSizeBytes) { - LOGV2_DEBUG(7106503, - 1, - "The telemetry store size has been capped", - "cappedSize"_attr = cappedSize); - } + auto requestedSize = memory_util::getRequestedMemSizeInBytes(memSize); + auto cappedSize = capTelemetryStoreSize(requestedSize); + auto& telemetryStoreManager = telemetryStoreDecoration(serviceCtx); - auto&& [telemetryStore, resourceLock] = telemetryStoreManager->getTelemetryStore(); - telemetryStore->reset(cappedSize); + auto client = Client::getCurrent(); + uassert(7139000, "Client required to update telemetry store size", client != nullptr); + auto&& sharedTelemetryStore = + telemetryStoreManager->getTelemetryStore(client->getOperationContext()); + sharedTelemetryStore->reset(cappedSize); } }; -const auto telemetryRateLimiter = - ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>(); - ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ "TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) { telemetry_util::telemetryStoreOnParamChangeUpdater(serviceCtx) = std::make_unique<TelemetryOnParamChangeUpdaterImpl>(); - auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get()); - uassertStatusOK(status); - size_t size = memory_util::getRequestedMemSizeInBytes(status.getValue()); - size_t cappedStoreSize = memory_util::capMemorySize( - size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); - // If capped size is less than requested size, the telemetry store has been capped at its - // upper limit. - if (cappedStoreSize < size) { - LOGV2_DEBUG(7106502, - 1, - "The telemetry store size has been capped", - "cappedSize"_attr = cappedStoreSize); - } + size_t size = getTelemetryStoreSize(); auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx); // Many partitions reduces lock contention on both reading and write telemetry data. size_t numPartitions = 1024; - size_t partitionBytes = cappedStoreSize / numPartitions; + size_t partitionBytes = size / numPartitions; size_t metricsSize = sizeof(TelemetryMetrics); if (partitionBytes < metricsSize * 10) { - numPartitions = cappedStoreSize / metricsSize; + numPartitions = size / metricsSize; if (numPartitions < 1) { numPartitions = 1; } } - globalTelemetryStoreManager = - std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, numPartitions); + globalTelemetryStoreManager = std::make_unique<TelemetryStoreManager>(size, numPartitions); telemetryRateLimiter(serviceCtx) = std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load()); }}; @@ -225,23 +225,22 @@ void throwIfEncounteringFLEPayload(const BSONElement& e) { */ class LockedMetrics { LockedMetrics(TelemetryMetrics* metrics, - Lock::ResourceLock telemetryStoreReadLock, + SharedTelemetryStore sharedTelemetryStore, TelemetryStore::Partition partitionLock) : _metrics(metrics), - _telemetryStoreReadLock(std::move(telemetryStoreReadLock)), + _sharedTelemetryStore(std::move(sharedTelemetryStore)), _partitionLock(std::move(partitionLock)) {} public: - static LockedMetrics get(const OperationContext* opCtx, const BSONObj& telemetryKey) { - auto&& [telemetryStore, telemetryStoreReadLock] = - getTelemetryStoreForRead(opCtx->getServiceContext()); + static LockedMetrics get(OperationContext* opCtx, const BSONObj& telemetryKey) { + auto&& sharedTelemetryStore = getTelemetryStoreForRead(opCtx); auto&& [statusWithMetrics, partitionLock] = - telemetryStore->getWithPartitionLock(telemetryKey); + sharedTelemetryStore->getWithPartitionLock(telemetryKey); TelemetryMetrics* metrics; if (statusWithMetrics.isOK()) { metrics = statusWithMetrics.getValue(); } else { - telemetryStore->put(telemetryKey, {}, partitionLock); + sharedTelemetryStore->put(telemetryKey, {}, partitionLock); auto newMetrics = partitionLock->get(telemetryKey); // This can happen if the budget is immediately exceeded. Specifically if the there is // not enough room for a single new entry if the number of partitions is too high @@ -249,7 +248,7 @@ public: tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); metrics = &newMetrics.getValue()->second; } - return LockedMetrics{metrics, std::move(telemetryStoreReadLock), std::move(partitionLock)}; + return LockedMetrics{metrics, std::move(sharedTelemetryStore), std::move(partitionLock)}; } TelemetryMetrics* operator->() const { @@ -259,7 +258,7 @@ public: private: TelemetryMetrics* _metrics; - Lock::ResourceLock _telemetryStoreReadLock; + SharedTelemetryStore _sharedTelemetryStore; TelemetryStore::Partition _partitionLock; }; @@ -479,16 +478,17 @@ void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planEx opCtx->storeQueryBSON(telemetryKey); } -std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead( - const ServiceContext* serviceCtx) { - return telemetryStoreDecoration(serviceCtx)->getTelemetryStore(); +SharedTelemetryStore getTelemetryStoreForRead(OperationContext* opCtx) { + return { + telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore(opCtx), + }; } -std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx) { - return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore(); +std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx) { + return telemetryStoreDecoration(opCtx->getServiceContext())->resetTelemetryStore(opCtx); } -void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle) { +void recordExecution(OperationContext* opCtx, const OpDebug& opDebug, bool isFle) { if (isFle) { return; } @@ -501,7 +501,7 @@ void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool metrics->queryOptMicros.aggregate(opDebug.planningTime.count()); } -void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug) { +void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) { auto&& telemetryKey = opCtx->getTelemetryKey(); if (telemetryKey.isEmpty()) { return; |