From 507288cdeed26adbdab4eccd66cb48b73d96559e Mon Sep 17 00:00:00 2001 From: Jess Balint Date: Mon, 28 Nov 2022 19:13:20 +0000 Subject: SERVER-71390 locking fix --- .../db/pipeline/document_source_telemetry.cpp | 8 +- src/mongo/db/query/telemetry.cpp | 138 ++++++++++----------- src/mongo/db/query/telemetry.h | 35 +++++- src/mongo/db/query/telemetry_store_test.cpp | 36 +++++- 4 files changed, 132 insertions(+), 85 deletions(-) diff --git a/src/mongo/db/pipeline/document_source_telemetry.cpp b/src/mongo/db/pipeline/document_source_telemetry.cpp index 812261edc1d..81e326c841a 100644 --- a/src/mongo/db/pipeline/document_source_telemetry.cpp +++ b/src/mongo/db/pipeline/document_source_telemetry.cpp @@ -80,14 +80,12 @@ Value DocumentSourceTelemetry::serialize(boost::optionalopCtx->getServiceContext()).first; - }(); + auto&& sharedTelemetryStore = [&]() { return getTelemetryStoreForRead(getContext()->opCtx); }(); // Here we start a new thread which runs until the document source finishes iterating the // telemetry store. - stdx::thread producer([&, telemetryStore] { - telemetryStore->forEachPartition( + stdx::thread producer([&, sharedTelemetryStore = std::move(sharedTelemetryStore)] { + sharedTelemetryStore->forEachPartition( [&](const std::function& getPartition) { // Block here waiting for the queue to be empty. Locking the partition will block // telemetry writers. We want to delay lock acquisition as long as possible. diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index 05b9a46b351..22c0dbd506f 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -32,7 +32,6 @@ #include "mongo/crypto/hash_block.h" #include "mongo/crypto/sha256_block.h" #include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/curop.h" #include "mongo/db/pipeline/aggregate_command_gen.h" @@ -44,7 +43,6 @@ #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/util/system_clock_source.h" -#include #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -54,6 +52,33 @@ namespace telemetry { namespace { +/** + * Cap the telemetry store size. + */ +size_t capTelemetryStoreSize(size_t requestedSize) { + size_t cappedStoreSize = memory_util::capMemorySize( + requestedSize /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); + // If capped size is less than requested size, the telemetry store has been capped at its + // upper limit. + if (cappedStoreSize < requestedSize) { + LOGV2_DEBUG(7106502, + 1, + "The telemetry store size has been capped", + "cappedSize"_attr = cappedStoreSize); + } + return cappedStoreSize; +} + +/** + * Get the telemetry store size based on the query job's value. + */ +size_t getTelemetryStoreSize() { + auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get()); + uassertStatusOK(status); + size_t requestedSize = memory_util::getRequestedMemSizeInBytes(status.getValue()); + return capTelemetryStoreSize(requestedSize); +} + /** * A manager for the telemetry store allows a "pointer swap" on the telemetry store itself. The * usage patterns are as follows: @@ -67,19 +92,18 @@ namespace { class TelemetryStoreManager { public: template - TelemetryStoreManager(ServiceContext* serviceContext, TelemetryStoreArgs... args) + TelemetryStoreManager(TelemetryStoreArgs... args) : _telemetryStore( - std::make_unique(std::forward(args)...)), - _instanceLock(LockerImpl{serviceContext}), - _instanceMutex("TelemetryStoreManager") {} + std::make_unique(std::forward(args)...)) {} /** * Acquire the instance of the telemetry store. The telemetry store is mutable and a shared * "read lock" is obtained on the instance. That is, the telemetry store instance will not * be replaced. */ - std::pair getTelemetryStore() { - return std::make_pair(&*_telemetryStore, Lock::SharedLock{&_instanceLock, _instanceMutex}); + SharedTelemetryStore getTelemetryStore(OperationContext* opCtx) { + Lock::SharedLock readLock = {opCtx, _instanceMutex}; + return {_telemetryStore.get(), std::move(readLock)}; } /** @@ -87,82 +111,58 @@ public: * internal instance with a new instance. This operation acquires an exclusive "write lock" * which waits for all read locks to be released before replacing the instance. */ - std::unique_ptr resetTelemetryStore() { - Lock::ExclusiveLock writeLock{&_instanceLock, _instanceMutex}; - auto newStore = std::make_unique(_telemetryStore->size(), - _telemetryStore->numPartitions()); + std::unique_ptr resetTelemetryStore(OperationContext* opCtx) { + size_t size = getTelemetryStoreSize(); + Lock::ExclusiveLock writeLock{opCtx, _instanceMutex}; + auto newStore = std::make_unique(size, _telemetryStore->numPartitions()); std::swap(_telemetryStore, newStore); return newStore; // which is now the old store. } private: - std::unique_ptr _telemetryStore; + Lock::ResourceMutex _instanceMutex{"TelemetryStoreManager"}; - /** - * Lock over the telemetry store. - */ - LockerImpl _instanceLock; - - Lock::ResourceMutex _instanceMutex; + std::unique_ptr _telemetryStore; }; const auto telemetryStoreDecoration = ServiceContext::declareDecoration>(); +const auto telemetryRateLimiter = + ServiceContext::declareDecoration>(); + class TelemetryOnParamChangeUpdaterImpl final : public telemetry_util::OnParamChangeUpdater { public: void updateCacheSize(ServiceContext* serviceCtx, memory_util::MemorySize memSize) final { - auto newSizeBytes = memory_util::getRequestedMemSizeInBytes(memSize); - auto cappedSize = memory_util::capMemorySize( - newSizeBytes /*requestedSize*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); - - /* If capped size is less than requested size, the telemetry store has been capped at - * its upper limit*/ - if (cappedSize < newSizeBytes) { - LOGV2_DEBUG(7106503, - 1, - "The telemetry store size has been capped", - "cappedSize"_attr = cappedSize); - } + auto requestedSize = memory_util::getRequestedMemSizeInBytes(memSize); + auto cappedSize = capTelemetryStoreSize(requestedSize); + auto& telemetryStoreManager = telemetryStoreDecoration(serviceCtx); - auto&& [telemetryStore, resourceLock] = telemetryStoreManager->getTelemetryStore(); - telemetryStore->reset(cappedSize); + auto client = Client::getCurrent(); + uassert(7139000, "Client required to update telemetry store size", client != nullptr); + auto&& sharedTelemetryStore = + telemetryStoreManager->getTelemetryStore(client->getOperationContext()); + sharedTelemetryStore->reset(cappedSize); } }; -const auto telemetryRateLimiter = - ServiceContext::declareDecoration>(); - ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ "TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) { telemetry_util::telemetryStoreOnParamChangeUpdater(serviceCtx) = std::make_unique(); - auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get()); - uassertStatusOK(status); - size_t size = memory_util::getRequestedMemSizeInBytes(status.getValue()); - size_t cappedStoreSize = memory_util::capMemorySize( - size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); - // If capped size is less than requested size, the telemetry store has been capped at its - // upper limit. - if (cappedStoreSize < size) { - LOGV2_DEBUG(7106502, - 1, - "The telemetry store size has been capped", - "cappedSize"_attr = cappedStoreSize); - } + size_t size = getTelemetryStoreSize(); auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx); // Many partitions reduces lock contention on both reading and write telemetry data. size_t numPartitions = 1024; - size_t partitionBytes = cappedStoreSize / numPartitions; + size_t partitionBytes = size / numPartitions; size_t metricsSize = sizeof(TelemetryMetrics); if (partitionBytes < metricsSize * 10) { - numPartitions = cappedStoreSize / metricsSize; + numPartitions = size / metricsSize; if (numPartitions < 1) { numPartitions = 1; } } - globalTelemetryStoreManager = - std::make_unique(serviceCtx, cappedStoreSize, numPartitions); + globalTelemetryStoreManager = std::make_unique(size, numPartitions); telemetryRateLimiter(serviceCtx) = std::make_unique(queryTelemetrySamplingRate.load()); }}; @@ -225,23 +225,22 @@ void throwIfEncounteringFLEPayload(const BSONElement& e) { */ class LockedMetrics { LockedMetrics(TelemetryMetrics* metrics, - Lock::ResourceLock telemetryStoreReadLock, + SharedTelemetryStore sharedTelemetryStore, TelemetryStore::Partition partitionLock) : _metrics(metrics), - _telemetryStoreReadLock(std::move(telemetryStoreReadLock)), + _sharedTelemetryStore(std::move(sharedTelemetryStore)), _partitionLock(std::move(partitionLock)) {} public: - static LockedMetrics get(const OperationContext* opCtx, const BSONObj& telemetryKey) { - auto&& [telemetryStore, telemetryStoreReadLock] = - getTelemetryStoreForRead(opCtx->getServiceContext()); + static LockedMetrics get(OperationContext* opCtx, const BSONObj& telemetryKey) { + auto&& sharedTelemetryStore = getTelemetryStoreForRead(opCtx); auto&& [statusWithMetrics, partitionLock] = - telemetryStore->getWithPartitionLock(telemetryKey); + sharedTelemetryStore->getWithPartitionLock(telemetryKey); TelemetryMetrics* metrics; if (statusWithMetrics.isOK()) { metrics = statusWithMetrics.getValue(); } else { - telemetryStore->put(telemetryKey, {}, partitionLock); + sharedTelemetryStore->put(telemetryKey, {}, partitionLock); auto newMetrics = partitionLock->get(telemetryKey); // This can happen if the budget is immediately exceeded. Specifically if the there is // not enough room for a single new entry if the number of partitions is too high @@ -249,7 +248,7 @@ public: tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); metrics = &newMetrics.getValue()->second; } - return LockedMetrics{metrics, std::move(telemetryStoreReadLock), std::move(partitionLock)}; + return LockedMetrics{metrics, std::move(sharedTelemetryStore), std::move(partitionLock)}; } TelemetryMetrics* operator->() const { @@ -259,7 +258,7 @@ public: private: TelemetryMetrics* _metrics; - Lock::ResourceLock _telemetryStoreReadLock; + SharedTelemetryStore _sharedTelemetryStore; TelemetryStore::Partition _partitionLock; }; @@ -479,16 +478,17 @@ void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planEx opCtx->storeQueryBSON(telemetryKey); } -std::pair getTelemetryStoreForRead( - const ServiceContext* serviceCtx) { - return telemetryStoreDecoration(serviceCtx)->getTelemetryStore(); +SharedTelemetryStore getTelemetryStoreForRead(OperationContext* opCtx) { + return { + telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore(opCtx), + }; } -std::unique_ptr resetTelemetryStore(const ServiceContext* serviceCtx) { - return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore(); +std::unique_ptr resetTelemetryStore(OperationContext* opCtx) { + return telemetryStoreDecoration(opCtx->getServiceContext())->resetTelemetryStore(opCtx); } -void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle) { +void recordExecution(OperationContext* opCtx, const OpDebug& opDebug, bool isFle) { if (isFle) { return; } @@ -501,7 +501,7 @@ void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool metrics->queryOptMicros.aggregate(opDebug.planningTime.count()); } -void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug) { +void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) { auto&& telemetryKey = opCtx->getTelemetryKey(); if (telemetryKey.isEmpty()) { return; diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h index 0e225d2f17e..b9dc00b3af9 100644 --- a/src/mongo/db/query/telemetry.h +++ b/src/mongo/db/query/telemetry.h @@ -169,13 +169,38 @@ using TelemetryStore = PartitionedCache; +/** + * Shared instance of the telemetry store which holds a read-lock while in use. The read-lock + * prevents the telemetry store as a whole from being replaced. The telemetry store itself is + * mutable. + */ +class SharedTelemetryStore { +public: + SharedTelemetryStore(TelemetryStore* telemetryStore, Lock::SharedLock lock) + : _telemetryStore(telemetryStore), _lock(std::move(lock)) {} + + SharedTelemetryStore(SharedTelemetryStore&& other) + : _telemetryStore(other._telemetryStore), _lock(std::move(other._lock)) {} + + SharedTelemetryStore(const SharedTelemetryStore&) = delete; + SharedTelemetryStore& operator=(const SharedTelemetryStore&) = delete; + + TelemetryStore* operator->() const { + return _telemetryStore; + } + +private: + TelemetryStore* _telemetryStore; + + Lock::SharedLock _lock; +}; + /** * Acquire a reference to the global telemetry store. */ -std::pair getTelemetryStoreForRead( - const ServiceContext* serviceCtx); +SharedTelemetryStore getTelemetryStoreForRead(OperationContext* opCtx); -std::unique_ptr resetTelemetryStore(const ServiceContext* serviceCtx); +std::unique_ptr resetTelemetryStore(OperationContext* opCtx); bool isTelemetryEnabled(const ServiceContext* serviceCtx); @@ -198,13 +223,13 @@ void registerFindRequest(const FindCommandRequest& request, void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planExplainer); -void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle); +void recordExecution(OperationContext* opCtx, const OpDebug& opDebug, bool isFle); /** * Collect telemetry for the operation identified by `key`. The `isExec` flag should be set if it's * the beginning of execution (first batch) of results and not set for subsequent getMore() calls. */ -void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug); +void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug); } // namespace telemetry } // namespace mongo diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp index c05a6fb0aac..da4cd55ad6c 100644 --- a/src/mongo/db/query/telemetry_store_test.cpp +++ b/src/mongo/db/query/telemetry_store_test.cpp @@ -30,16 +30,12 @@ #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/query/telemetry.h" +#include "mongo/db/service_context_test_fixture.h" #include "mongo/unittest/unittest.h" namespace mongo::telemetry { -class TelemetryStoreTest : public unittest::Test { -protected: - void setUp() override {} - - void tearDown() override{}; -}; +class TelemetryStoreTest : public ServiceContextTest {}; TEST_F(TelemetryStoreTest, BasicUsage) { TelemetryStore telStore{5000000, 1000}; @@ -96,4 +92,32 @@ TEST_F(TelemetryStoreTest, BasicUsage) { ASSERT_EQ(numKeys, 2); } +/** + * Spin up multiple threads reading/writing the store to empirically verify we don't hit an + * invariant. + */ +TEST_F(TelemetryStoreTest, ReadWriteLocking) { + std::vector threads; + auto svcCtx = getServiceContext(); + for (int i = 0; i < 20; ++i) { + threads.emplace_back([&, i]() { + auto client = svcCtx->makeClient(std::to_string(i)); + auto opCtx = client->makeOperationContext(); + if (i % 4 > 0) { + auto sharedTelemetryStore = getTelemetryStoreForRead(opCtx.get()); + BSONObj key; + auto status = sharedTelemetryStore->lookup(key); + // Key isn't found, we just need to do something while holding the store's read + // lock. + ASSERT_NOT_OK(status); + } else { + [[maybe_unused]] auto oldStore = resetTelemetryStore(opCtx.get()); + } + }); + } + for (auto&& thread : threads) { + thread.join(); + } +} + } // namespace mongo::telemetry -- cgit v1.2.1