summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/telemetry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/telemetry.cpp')
-rw-r--r--src/mongo/db/query/telemetry.cpp138
1 files changed, 69 insertions, 69 deletions
diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp
index 05b9a46b351..22c0dbd506f 100644
--- a/src/mongo/db/query/telemetry.cpp
+++ b/src/mongo/db/query/telemetry.cpp
@@ -32,7 +32,6 @@
#include "mongo/crypto/hash_block.h"
#include "mongo/crypto/sha256_block.h"
#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
@@ -44,7 +43,6 @@
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/util/system_clock_source.h"
-#include <cstddef>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@@ -55,6 +53,33 @@ namespace telemetry {
namespace {
/**
+ * Cap the telemetry store size.
+ */
+size_t capTelemetryStoreSize(size_t requestedSize) {
+ size_t cappedStoreSize = memory_util::capMemorySize(
+ requestedSize /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
+ // If capped size is less than requested size, the telemetry store has been capped at its
+ // upper limit.
+ if (cappedStoreSize < requestedSize) {
+ LOGV2_DEBUG(7106502,
+ 1,
+ "The telemetry store size has been capped",
+ "cappedSize"_attr = cappedStoreSize);
+ }
+ return cappedStoreSize;
+}
+
+/**
+ * Get the telemetry store size based on the query job's value.
+ */
+size_t getTelemetryStoreSize() {
+ auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get());
+ uassertStatusOK(status);
+ size_t requestedSize = memory_util::getRequestedMemSizeInBytes(status.getValue());
+ return capTelemetryStoreSize(requestedSize);
+}
+
+/**
* A manager for the telemetry store allows a "pointer swap" on the telemetry store itself. The
* usage patterns are as follows:
*
@@ -67,19 +92,18 @@ namespace {
class TelemetryStoreManager {
public:
template <typename... TelemetryStoreArgs>
- TelemetryStoreManager(ServiceContext* serviceContext, TelemetryStoreArgs... args)
+ TelemetryStoreManager(TelemetryStoreArgs... args)
: _telemetryStore(
- std::make_unique<TelemetryStore>(std::forward<TelemetryStoreArgs>(args)...)),
- _instanceLock(LockerImpl{serviceContext}),
- _instanceMutex("TelemetryStoreManager") {}
+ std::make_unique<TelemetryStore>(std::forward<TelemetryStoreArgs>(args)...)) {}
/**
* Acquire the instance of the telemetry store. The telemetry store is mutable and a shared
* "read lock" is obtained on the instance. That is, the telemetry store instance will not
* be replaced.
*/
- std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStore() {
- return std::make_pair(&*_telemetryStore, Lock::SharedLock{&_instanceLock, _instanceMutex});
+ SharedTelemetryStore getTelemetryStore(OperationContext* opCtx) {
+ Lock::SharedLock readLock = {opCtx, _instanceMutex};
+ return {_telemetryStore.get(), std::move(readLock)};
}
/**
@@ -87,82 +111,58 @@ public:
* internal instance with a new instance. This operation acquires an exclusive "write lock"
* which waits for all read locks to be released before replacing the instance.
*/
- std::unique_ptr<TelemetryStore> resetTelemetryStore() {
- Lock::ExclusiveLock writeLock{&_instanceLock, _instanceMutex};
- auto newStore = std::make_unique<TelemetryStore>(_telemetryStore->size(),
- _telemetryStore->numPartitions());
+ std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx) {
+ size_t size = getTelemetryStoreSize();
+ Lock::ExclusiveLock writeLock{opCtx, _instanceMutex};
+ auto newStore = std::make_unique<TelemetryStore>(size, _telemetryStore->numPartitions());
std::swap(_telemetryStore, newStore);
return newStore; // which is now the old store.
}
private:
- std::unique_ptr<TelemetryStore> _telemetryStore;
+ Lock::ResourceMutex _instanceMutex{"TelemetryStoreManager"};
- /**
- * Lock over the telemetry store.
- */
- LockerImpl _instanceLock;
-
- Lock::ResourceMutex _instanceMutex;
+ std::unique_ptr<TelemetryStore> _telemetryStore;
};
const auto telemetryStoreDecoration =
ServiceContext::declareDecoration<std::unique_ptr<TelemetryStoreManager>>();
+const auto telemetryRateLimiter =
+ ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>();
+
class TelemetryOnParamChangeUpdaterImpl final : public telemetry_util::OnParamChangeUpdater {
public:
void updateCacheSize(ServiceContext* serviceCtx, memory_util::MemorySize memSize) final {
- auto newSizeBytes = memory_util::getRequestedMemSizeInBytes(memSize);
- auto cappedSize = memory_util::capMemorySize(
- newSizeBytes /*requestedSize*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
-
- /* If capped size is less than requested size, the telemetry store has been capped at
- * its upper limit*/
- if (cappedSize < newSizeBytes) {
- LOGV2_DEBUG(7106503,
- 1,
- "The telemetry store size has been capped",
- "cappedSize"_attr = cappedSize);
- }
+ auto requestedSize = memory_util::getRequestedMemSizeInBytes(memSize);
+ auto cappedSize = capTelemetryStoreSize(requestedSize);
+
auto& telemetryStoreManager = telemetryStoreDecoration(serviceCtx);
- auto&& [telemetryStore, resourceLock] = telemetryStoreManager->getTelemetryStore();
- telemetryStore->reset(cappedSize);
+ auto client = Client::getCurrent();
+ uassert(7139000, "Client required to update telemetry store size", client != nullptr);
+ auto&& sharedTelemetryStore =
+ telemetryStoreManager->getTelemetryStore(client->getOperationContext());
+ sharedTelemetryStore->reset(cappedSize);
}
};
-const auto telemetryRateLimiter =
- ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>();
-
ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
"TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) {
telemetry_util::telemetryStoreOnParamChangeUpdater(serviceCtx) =
std::make_unique<TelemetryOnParamChangeUpdaterImpl>();
- auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get());
- uassertStatusOK(status);
- size_t size = memory_util::getRequestedMemSizeInBytes(status.getValue());
- size_t cappedStoreSize = memory_util::capMemorySize(
- size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
- // If capped size is less than requested size, the telemetry store has been capped at its
- // upper limit.
- if (cappedStoreSize < size) {
- LOGV2_DEBUG(7106502,
- 1,
- "The telemetry store size has been capped",
- "cappedSize"_attr = cappedStoreSize);
- }
+ size_t size = getTelemetryStoreSize();
auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx);
// Many partitions reduces lock contention on both reading and write telemetry data.
size_t numPartitions = 1024;
- size_t partitionBytes = cappedStoreSize / numPartitions;
+ size_t partitionBytes = size / numPartitions;
size_t metricsSize = sizeof(TelemetryMetrics);
if (partitionBytes < metricsSize * 10) {
- numPartitions = cappedStoreSize / metricsSize;
+ numPartitions = size / metricsSize;
if (numPartitions < 1) {
numPartitions = 1;
}
}
- globalTelemetryStoreManager =
- std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, numPartitions);
+ globalTelemetryStoreManager = std::make_unique<TelemetryStoreManager>(size, numPartitions);
telemetryRateLimiter(serviceCtx) =
std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load());
}};
@@ -225,23 +225,22 @@ void throwIfEncounteringFLEPayload(const BSONElement& e) {
*/
class LockedMetrics {
LockedMetrics(TelemetryMetrics* metrics,
- Lock::ResourceLock telemetryStoreReadLock,
+ SharedTelemetryStore sharedTelemetryStore,
TelemetryStore::Partition partitionLock)
: _metrics(metrics),
- _telemetryStoreReadLock(std::move(telemetryStoreReadLock)),
+ _sharedTelemetryStore(std::move(sharedTelemetryStore)),
_partitionLock(std::move(partitionLock)) {}
public:
- static LockedMetrics get(const OperationContext* opCtx, const BSONObj& telemetryKey) {
- auto&& [telemetryStore, telemetryStoreReadLock] =
- getTelemetryStoreForRead(opCtx->getServiceContext());
+ static LockedMetrics get(OperationContext* opCtx, const BSONObj& telemetryKey) {
+ auto&& sharedTelemetryStore = getTelemetryStoreForRead(opCtx);
auto&& [statusWithMetrics, partitionLock] =
- telemetryStore->getWithPartitionLock(telemetryKey);
+ sharedTelemetryStore->getWithPartitionLock(telemetryKey);
TelemetryMetrics* metrics;
if (statusWithMetrics.isOK()) {
metrics = statusWithMetrics.getValue();
} else {
- telemetryStore->put(telemetryKey, {}, partitionLock);
+ sharedTelemetryStore->put(telemetryKey, {}, partitionLock);
auto newMetrics = partitionLock->get(telemetryKey);
// This can happen if the budget is immediately exceeded. Specifically if the there is
// not enough room for a single new entry if the number of partitions is too high
@@ -249,7 +248,7 @@ public:
tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
metrics = &newMetrics.getValue()->second;
}
- return LockedMetrics{metrics, std::move(telemetryStoreReadLock), std::move(partitionLock)};
+ return LockedMetrics{metrics, std::move(sharedTelemetryStore), std::move(partitionLock)};
}
TelemetryMetrics* operator->() const {
@@ -259,7 +258,7 @@ public:
private:
TelemetryMetrics* _metrics;
- Lock::ResourceLock _telemetryStoreReadLock;
+ SharedTelemetryStore _sharedTelemetryStore;
TelemetryStore::Partition _partitionLock;
};
@@ -479,16 +478,17 @@ void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planEx
opCtx->storeQueryBSON(telemetryKey);
}
-std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(
- const ServiceContext* serviceCtx) {
- return telemetryStoreDecoration(serviceCtx)->getTelemetryStore();
+SharedTelemetryStore getTelemetryStoreForRead(OperationContext* opCtx) {
+ return {
+ telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore(opCtx),
+ };
}
-std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx) {
- return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore();
+std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx) {
+ return telemetryStoreDecoration(opCtx->getServiceContext())->resetTelemetryStore(opCtx);
}
-void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle) {
+void recordExecution(OperationContext* opCtx, const OpDebug& opDebug, bool isFle) {
if (isFle) {
return;
}
@@ -501,7 +501,7 @@ void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool
metrics->queryOptMicros.aggregate(opDebug.planningTime.count());
}
-void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug) {
+void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) {
auto&& telemetryKey = opCtx->getTelemetryKey();
if (telemetryKey.isEmpty()) {
return;