summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJess Balint <jbalint@gmail.com>2022-11-28 19:13:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-28 19:54:55 +0000
commit507288cdeed26adbdab4eccd66cb48b73d96559e (patch)
tree74ed81ec75b1131af4529ad1bb9b9caf139f6183
parentc24ee3b98bdf06bed6602f058df1e137997f9b82 (diff)
downloadmongo-507288cdeed26adbdab4eccd66cb48b73d96559e.tar.gz
SERVER-71390 locking fix
-rw-r--r--src/mongo/db/pipeline/document_source_telemetry.cpp8
-rw-r--r--src/mongo/db/query/telemetry.cpp138
-rw-r--r--src/mongo/db/query/telemetry.h35
-rw-r--r--src/mongo/db/query/telemetry_store_test.cpp36
4 files changed, 132 insertions, 85 deletions
diff --git a/src/mongo/db/pipeline/document_source_telemetry.cpp b/src/mongo/db/pipeline/document_source_telemetry.cpp
index 812261edc1d..81e326c841a 100644
--- a/src/mongo/db/pipeline/document_source_telemetry.cpp
+++ b/src/mongo/db/pipeline/document_source_telemetry.cpp
@@ -80,14 +80,12 @@ Value DocumentSourceTelemetry::serialize(boost::optional<ExplainOptions::Verbosi
}
void DocumentSourceTelemetry::buildTelemetryStoreIterator() {
- TelemetryStore* telemetryStore = [&]() {
- return getTelemetryStoreForRead(getContext()->opCtx->getServiceContext()).first;
- }();
+ auto&& sharedTelemetryStore = [&]() { return getTelemetryStoreForRead(getContext()->opCtx); }();
// Here we start a new thread which runs until the document source finishes iterating the
// telemetry store.
- stdx::thread producer([&, telemetryStore] {
- telemetryStore->forEachPartition(
+ stdx::thread producer([&, sharedTelemetryStore = std::move(sharedTelemetryStore)] {
+ sharedTelemetryStore->forEachPartition(
[&](const std::function<TelemetryStore::Partition()>& getPartition) {
// Block here waiting for the queue to be empty. Locking the partition will block
// telemetry writers. We want to delay lock acquisition as long as possible.
diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp
index 05b9a46b351..22c0dbd506f 100644
--- a/src/mongo/db/query/telemetry.cpp
+++ b/src/mongo/db/query/telemetry.cpp
@@ -32,7 +32,6 @@
#include "mongo/crypto/hash_block.h"
#include "mongo/crypto/sha256_block.h"
#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
@@ -44,7 +43,6 @@
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/util/system_clock_source.h"
-#include <cstddef>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@@ -55,6 +53,33 @@ namespace telemetry {
namespace {
/**
+ * Cap the telemetry store size.
+ */
+size_t capTelemetryStoreSize(size_t requestedSize) {
+ size_t cappedStoreSize = memory_util::capMemorySize(
+ requestedSize /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
+ // If capped size is less than requested size, the telemetry store has been capped at its
+ // upper limit.
+ if (cappedStoreSize < requestedSize) {
+ LOGV2_DEBUG(7106502,
+ 1,
+ "The telemetry store size has been capped",
+ "cappedSize"_attr = cappedStoreSize);
+ }
+ return cappedStoreSize;
+}
+
+/**
+ * Get the telemetry store size based on the query job's value.
+ */
+size_t getTelemetryStoreSize() {
+ auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get());
+ uassertStatusOK(status);
+ size_t requestedSize = memory_util::getRequestedMemSizeInBytes(status.getValue());
+ return capTelemetryStoreSize(requestedSize);
+}
+
+/**
* A manager for the telemetry store allows a "pointer swap" on the telemetry store itself. The
* usage patterns are as follows:
*
@@ -67,19 +92,18 @@ namespace {
class TelemetryStoreManager {
public:
template <typename... TelemetryStoreArgs>
- TelemetryStoreManager(ServiceContext* serviceContext, TelemetryStoreArgs... args)
+ TelemetryStoreManager(TelemetryStoreArgs... args)
: _telemetryStore(
- std::make_unique<TelemetryStore>(std::forward<TelemetryStoreArgs>(args)...)),
- _instanceLock(LockerImpl{serviceContext}),
- _instanceMutex("TelemetryStoreManager") {}
+ std::make_unique<TelemetryStore>(std::forward<TelemetryStoreArgs>(args)...)) {}
/**
* Acquire the instance of the telemetry store. The telemetry store is mutable and a shared
* "read lock" is obtained on the instance. That is, the telemetry store instance will not
* be replaced.
*/
- std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStore() {
- return std::make_pair(&*_telemetryStore, Lock::SharedLock{&_instanceLock, _instanceMutex});
+ SharedTelemetryStore getTelemetryStore(OperationContext* opCtx) {
+ Lock::SharedLock readLock = {opCtx, _instanceMutex};
+ return {_telemetryStore.get(), std::move(readLock)};
}
/**
@@ -87,82 +111,58 @@ public:
* internal instance with a new instance. This operation acquires an exclusive "write lock"
* which waits for all read locks to be released before replacing the instance.
*/
- std::unique_ptr<TelemetryStore> resetTelemetryStore() {
- Lock::ExclusiveLock writeLock{&_instanceLock, _instanceMutex};
- auto newStore = std::make_unique<TelemetryStore>(_telemetryStore->size(),
- _telemetryStore->numPartitions());
+ std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx) {
+ size_t size = getTelemetryStoreSize();
+ Lock::ExclusiveLock writeLock{opCtx, _instanceMutex};
+ auto newStore = std::make_unique<TelemetryStore>(size, _telemetryStore->numPartitions());
std::swap(_telemetryStore, newStore);
return newStore; // which is now the old store.
}
private:
- std::unique_ptr<TelemetryStore> _telemetryStore;
+ Lock::ResourceMutex _instanceMutex{"TelemetryStoreManager"};
- /**
- * Lock over the telemetry store.
- */
- LockerImpl _instanceLock;
-
- Lock::ResourceMutex _instanceMutex;
+ std::unique_ptr<TelemetryStore> _telemetryStore;
};
const auto telemetryStoreDecoration =
ServiceContext::declareDecoration<std::unique_ptr<TelemetryStoreManager>>();
+const auto telemetryRateLimiter =
+ ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>();
+
class TelemetryOnParamChangeUpdaterImpl final : public telemetry_util::OnParamChangeUpdater {
public:
void updateCacheSize(ServiceContext* serviceCtx, memory_util::MemorySize memSize) final {
- auto newSizeBytes = memory_util::getRequestedMemSizeInBytes(memSize);
- auto cappedSize = memory_util::capMemorySize(
- newSizeBytes /*requestedSize*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
-
- /* If capped size is less than requested size, the telemetry store has been capped at
- * its upper limit*/
- if (cappedSize < newSizeBytes) {
- LOGV2_DEBUG(7106503,
- 1,
- "The telemetry store size has been capped",
- "cappedSize"_attr = cappedSize);
- }
+ auto requestedSize = memory_util::getRequestedMemSizeInBytes(memSize);
+ auto cappedSize = capTelemetryStoreSize(requestedSize);
+
auto& telemetryStoreManager = telemetryStoreDecoration(serviceCtx);
- auto&& [telemetryStore, resourceLock] = telemetryStoreManager->getTelemetryStore();
- telemetryStore->reset(cappedSize);
+ auto client = Client::getCurrent();
+ uassert(7139000, "Client required to update telemetry store size", client != nullptr);
+ auto&& sharedTelemetryStore =
+ telemetryStoreManager->getTelemetryStore(client->getOperationContext());
+ sharedTelemetryStore->reset(cappedSize);
}
};
-const auto telemetryRateLimiter =
- ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>();
-
ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
"TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) {
telemetry_util::telemetryStoreOnParamChangeUpdater(serviceCtx) =
std::make_unique<TelemetryOnParamChangeUpdaterImpl>();
- auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get());
- uassertStatusOK(status);
- size_t size = memory_util::getRequestedMemSizeInBytes(status.getValue());
- size_t cappedStoreSize = memory_util::capMemorySize(
- size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
- // If capped size is less than requested size, the telemetry store has been capped at its
- // upper limit.
- if (cappedStoreSize < size) {
- LOGV2_DEBUG(7106502,
- 1,
- "The telemetry store size has been capped",
- "cappedSize"_attr = cappedStoreSize);
- }
+ size_t size = getTelemetryStoreSize();
auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx);
// Many partitions reduces lock contention on both reading and write telemetry data.
size_t numPartitions = 1024;
- size_t partitionBytes = cappedStoreSize / numPartitions;
+ size_t partitionBytes = size / numPartitions;
size_t metricsSize = sizeof(TelemetryMetrics);
if (partitionBytes < metricsSize * 10) {
- numPartitions = cappedStoreSize / metricsSize;
+ numPartitions = size / metricsSize;
if (numPartitions < 1) {
numPartitions = 1;
}
}
- globalTelemetryStoreManager =
- std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, numPartitions);
+ globalTelemetryStoreManager = std::make_unique<TelemetryStoreManager>(size, numPartitions);
telemetryRateLimiter(serviceCtx) =
std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load());
}};
@@ -225,23 +225,22 @@ void throwIfEncounteringFLEPayload(const BSONElement& e) {
*/
class LockedMetrics {
LockedMetrics(TelemetryMetrics* metrics,
- Lock::ResourceLock telemetryStoreReadLock,
+ SharedTelemetryStore sharedTelemetryStore,
TelemetryStore::Partition partitionLock)
: _metrics(metrics),
- _telemetryStoreReadLock(std::move(telemetryStoreReadLock)),
+ _sharedTelemetryStore(std::move(sharedTelemetryStore)),
_partitionLock(std::move(partitionLock)) {}
public:
- static LockedMetrics get(const OperationContext* opCtx, const BSONObj& telemetryKey) {
- auto&& [telemetryStore, telemetryStoreReadLock] =
- getTelemetryStoreForRead(opCtx->getServiceContext());
+ static LockedMetrics get(OperationContext* opCtx, const BSONObj& telemetryKey) {
+ auto&& sharedTelemetryStore = getTelemetryStoreForRead(opCtx);
auto&& [statusWithMetrics, partitionLock] =
- telemetryStore->getWithPartitionLock(telemetryKey);
+ sharedTelemetryStore->getWithPartitionLock(telemetryKey);
TelemetryMetrics* metrics;
if (statusWithMetrics.isOK()) {
metrics = statusWithMetrics.getValue();
} else {
- telemetryStore->put(telemetryKey, {}, partitionLock);
+ sharedTelemetryStore->put(telemetryKey, {}, partitionLock);
auto newMetrics = partitionLock->get(telemetryKey);
// This can happen if the budget is immediately exceeded. Specifically if the there is
// not enough room for a single new entry if the number of partitions is too high
@@ -249,7 +248,7 @@ public:
tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
metrics = &newMetrics.getValue()->second;
}
- return LockedMetrics{metrics, std::move(telemetryStoreReadLock), std::move(partitionLock)};
+ return LockedMetrics{metrics, std::move(sharedTelemetryStore), std::move(partitionLock)};
}
TelemetryMetrics* operator->() const {
@@ -259,7 +258,7 @@ public:
private:
TelemetryMetrics* _metrics;
- Lock::ResourceLock _telemetryStoreReadLock;
+ SharedTelemetryStore _sharedTelemetryStore;
TelemetryStore::Partition _partitionLock;
};
@@ -479,16 +478,17 @@ void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planEx
opCtx->storeQueryBSON(telemetryKey);
}
-std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(
- const ServiceContext* serviceCtx) {
- return telemetryStoreDecoration(serviceCtx)->getTelemetryStore();
+SharedTelemetryStore getTelemetryStoreForRead(OperationContext* opCtx) {
+ return {
+ telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore(opCtx),
+ };
}
-std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx) {
- return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore();
+std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx) {
+ return telemetryStoreDecoration(opCtx->getServiceContext())->resetTelemetryStore(opCtx);
}
-void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle) {
+void recordExecution(OperationContext* opCtx, const OpDebug& opDebug, bool isFle) {
if (isFle) {
return;
}
@@ -501,7 +501,7 @@ void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool
metrics->queryOptMicros.aggregate(opDebug.planningTime.count());
}
-void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug) {
+void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) {
auto&& telemetryKey = opCtx->getTelemetryKey();
if (telemetryKey.isEmpty()) {
return;
diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h
index 0e225d2f17e..b9dc00b3af9 100644
--- a/src/mongo/db/query/telemetry.h
+++ b/src/mongo/db/query/telemetry.h
@@ -170,12 +170,37 @@ using TelemetryStore = PartitionedCache<BSONObj,
SimpleBSONObjComparator::EqualTo>;
/**
+ * Shared instance of the telemetry store which holds a read-lock while in use. The read-lock
+ * prevents the telemetry store as a whole from being replaced. The telemetry store itself is
+ * mutable.
+ */
+class SharedTelemetryStore {
+public:
+ SharedTelemetryStore(TelemetryStore* telemetryStore, Lock::SharedLock lock)
+ : _telemetryStore(telemetryStore), _lock(std::move(lock)) {}
+
+ SharedTelemetryStore(SharedTelemetryStore&& other)
+ : _telemetryStore(other._telemetryStore), _lock(std::move(other._lock)) {}
+
+ SharedTelemetryStore(const SharedTelemetryStore&) = delete;
+ SharedTelemetryStore& operator=(const SharedTelemetryStore&) = delete;
+
+ TelemetryStore* operator->() const {
+ return _telemetryStore;
+ }
+
+private:
+ TelemetryStore* _telemetryStore;
+
+ Lock::SharedLock _lock;
+};
+
+/**
* Acquire a reference to the global telemetry store.
*/
-std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(
- const ServiceContext* serviceCtx);
+SharedTelemetryStore getTelemetryStoreForRead(OperationContext* opCtx);
-std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx);
+std::unique_ptr<TelemetryStore> resetTelemetryStore(OperationContext* opCtx);
bool isTelemetryEnabled(const ServiceContext* serviceCtx);
@@ -198,13 +223,13 @@ void registerFindRequest(const FindCommandRequest& request,
void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planExplainer);
-void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle);
+void recordExecution(OperationContext* opCtx, const OpDebug& opDebug, bool isFle);
/**
* Collect telemetry for the operation identified by `key`. The `isExec` flag should be set if it's
* the beginning of execution (first batch) of results and not set for subsequent getMore() calls.
*/
-void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug);
+void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug);
} // namespace telemetry
} // namespace mongo
diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp
index c05a6fb0aac..da4cd55ad6c 100644
--- a/src/mongo/db/query/telemetry_store_test.cpp
+++ b/src/mongo/db/query/telemetry_store_test.cpp
@@ -30,16 +30,12 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/query/telemetry.h"
+#include "mongo/db/service_context_test_fixture.h"
#include "mongo/unittest/unittest.h"
namespace mongo::telemetry {
-class TelemetryStoreTest : public unittest::Test {
-protected:
- void setUp() override {}
-
- void tearDown() override{};
-};
+class TelemetryStoreTest : public ServiceContextTest {};
TEST_F(TelemetryStoreTest, BasicUsage) {
TelemetryStore telStore{5000000, 1000};
@@ -96,4 +92,32 @@ TEST_F(TelemetryStoreTest, BasicUsage) {
ASSERT_EQ(numKeys, 2);
}
+/**
+ * Spin up multiple threads reading/writing the store to empirically verify we don't hit an
+ * invariant.
+ */
+TEST_F(TelemetryStoreTest, ReadWriteLocking) {
+ std::vector<stdx::thread> threads;
+ auto svcCtx = getServiceContext();
+ for (int i = 0; i < 20; ++i) {
+ threads.emplace_back([&, i]() {
+ auto client = svcCtx->makeClient(std::to_string(i));
+ auto opCtx = client->makeOperationContext();
+ if (i % 4 > 0) {
+ auto sharedTelemetryStore = getTelemetryStoreForRead(opCtx.get());
+ BSONObj key;
+ auto status = sharedTelemetryStore->lookup(key);
+ // Key isn't found, we just need to do something while holding the store's read
+ // lock.
+ ASSERT_NOT_OK(status);
+ } else {
+ [[maybe_unused]] auto oldStore = resetTelemetryStore(opCtx.get());
+ }
+ });
+ }
+ for (auto&& thread : threads) {
+ thread.join();
+ }
+}
+
} // namespace mongo::telemetry