summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/find_cmd.cpp17
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp11
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp20
-rw-r--r--src/mongo/db/curop.cpp5
-rw-r--r--src/mongo/db/query/telemetry.cpp167
-rw-r--r--src/mongo/db/query/telemetry.h28
7 files changed, 139 insertions, 110 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 64e75bc738f..66e9eae26d8 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -198,6 +198,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/query/command_request_response',
+ '$BUILD_DIR/mongo/db/query/telemetry',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/timer_stats',
'$BUILD_DIR/mongo/db/storage/storage_engine_parameters',
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 5b623ca7daa..13cf5cff30c 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -566,6 +566,11 @@ public:
cq->setUseCqfIfEligible(true);
+ if (collection) {
+ telemetry::registerFindRequest(
+ cq->getFindCommandRequest(), collection.get()->ns(), opCtx);
+ }
+
// Get the execution plan for the query.
bool permitYield = true;
auto exec =
@@ -723,17 +728,7 @@ public:
query_request_helper::validateCursorResponse(result->getBodyBuilder().asTempObj(),
nss.tenantId());
- auto telemetryKey =
- telemetry::shouldCollectTelemetry(originalFC, collection.get()->ns(), opCtx);
-
- // FLE2 queries should not be included in telemetry, so make sure that we did not
- // rewrite this query before collecting telemetry.
- if (telemetryKey && !_didDoFLERewrite) {
- opCtx->storeQueryBSON(*telemetryKey);
-
- telemetry::collectTelemetry(
- opCtx->getServiceContext(), *telemetryKey, CurOp::get(opCtx)->debug(), true);
- }
+ telemetry::recordExecution(opCtx, CurOp::get(opCtx)->debug(), _didDoFLERewrite);
}
void appendMirrorableRequest(BSONObjBuilder* bob) const override {
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e5f62335112..7f7dc01b6fa 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -561,6 +561,8 @@ public:
exec->reattachToOperationContext(opCtx);
exec->restoreState(readLock ? &readLock->getCollection() : nullptr);
+ telemetry::registerGetMoreRequest(opCtx, exec->getPlanExplainer());
+
auto planSummary = exec->getPlanExplainer().getPlanSummary();
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -693,15 +695,6 @@ public:
// response batch.
curOp->debug().nreturned = numResults;
- auto telemetryKey = telemetry::shouldCollectTelemetry(
- opCtx, exec->getPlanExplainer().getTelemetryKey());
- if (telemetryKey) {
- telemetry::collectTelemetry(opCtx->getServiceContext(),
- exec->getPlanExplainer().getTelemetryKey(),
- curOp->debug(),
- false);
- }
-
if (respondWithId) {
cursorDeleter.dismiss();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 493c9a9544c..fc6c914e603 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -725,7 +725,8 @@ Status runAggregate(OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> expCtx;
auto curOp = CurOp::get(opCtx);
auto catalog = CollectionCatalog::get(opCtx);
- boost::optional<BSONObj> telemetryKey;
+
+ telemetry::registerAggRequest(request, opCtx);
// Since we remove encryptionInformation after rewriting a FLE2 query, this boolean keeps track
// of whether the input query did originally have enryption information.
@@ -1101,22 +1102,7 @@ Status runAggregate(OperationContext* opCtx,
curOp->debug().setPlanSummaryMetrics(stats);
curOp->debug().nreturned = stats.nReturned;
- // FLE2 queries should not be included in telemetry, so make sure that we did not
- // rewrite this query before collecting telemetry.
- if (!didDoFLERewrite) {
- telemetryKey = telemetry::shouldCollectTelemetry(request, opCtx);
- // Build the telemetry key and store it in the operation context
- if (telemetryKey) {
- // TODO SERVER-71315: should we store it in the CurOp instead? (or even
- // PlanExplainer)
- opCtx->storeQueryBSON(*telemetryKey);
- }
-
- if (telemetryKey) {
- telemetry::collectTelemetry(
- opCtx->getServiceContext(), *telemetryKey, curOp->debug(), true);
- }
- }
+ telemetry::recordExecution(opCtx, curOp->debug(), didDoFLERewrite);
// For an optimized away pipeline, signal the cache that a query operation has completed.
// For normal pipelines this is done in DocumentSourceCursor.
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index 63c2ac88dcb..f961ab83de4 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -46,6 +46,8 @@
#include "mongo/db/profile_filter.h"
#include "mongo/db/query/getmore_command_gen.h"
#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/query/telemetry.h"
+#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/storage/storage_engine_parameters_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/client_metadata.h"
@@ -57,7 +59,6 @@
#include "mongo/util/net/socket_utils.h"
#include "mongo/util/str.h"
#include "mongo/util/system_tick_source.h"
-#include <mongo/db/stats/timer_stats.h>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
@@ -435,6 +436,8 @@ bool CurOp::completeAndLogOperation(OperationContext* opCtx,
_debug.executionTime = duration_cast<Microseconds>(elapsedTimeExcludingPauses());
const auto executionTimeMillis = durationCount<Milliseconds>(_debug.executionTime);
+ telemetry::collectTelemetry(opCtx, CurOp::get(opCtx)->debug());
+
if (_debug.isReplOplogGetMore) {
oplogGetMoreStats.recordMillis(executionTimeMillis);
}
diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp
index 43f436aa920..7af325183f1 100644
--- a/src/mongo/db/query/telemetry.cpp
+++ b/src/mongo/db/query/telemetry.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/query/telemetry_util.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/client_metadata.h"
-#include "mongo/util/processinfo.h"
#include "mongo/util/system_clock_source.h"
#include <cstddef>
@@ -140,8 +139,8 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
std::make_unique<TelemetryOnParamChangeUpdaterImpl>();
auto status = memory_util::MemorySize::parse(queryTelemetryStoreSize.get());
uassertStatusOK(status);
- auto size = memory_util::getRequestedMemSizeInBytes(status.getValue());
- auto cappedStoreSize = memory_util::capMemorySize(
+ size_t size = memory_util::getRequestedMemSizeInBytes(status.getValue());
+ size_t cappedStoreSize = memory_util::capMemorySize(
size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
// If capped size is less than requested size, the telemetry store has been capped at its
// upper limit.
@@ -152,15 +151,18 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
"cappedSize"_attr = cappedStoreSize);
}
auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx);
- const int kNumPartitions = 100; // the more the merrier.
+ // Many partitions reduces lock contention on both reading and write telemetry data.
+ size_t numPartitions = 1024;
+ size_t partitionBytes = cappedStoreSize / numPartitions;
+ size_t metricsSize = sizeof(TelemetryMetrics);
+ if (partitionBytes < metricsSize * 10) {
+ numPartitions = cappedStoreSize / metricsSize;
+ if (numPartitions < 1) {
+ numPartitions = 1;
+ }
+ }
globalTelemetryStoreManager =
- std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, kNumPartitions);
- // TODO there will be a rate limiter initialized somewhere, and we can get the value from
- // there to save a .load(). We need the rate limiter to do rate limiting here anyway. int
- // samplingRate = queryTelemetrySamplingRate.load(); Quick escape if it's turned off? if
- // (!samplingRate) {
- // return;
- //}
+ std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, numPartitions);
telemetryRateLimiter(serviceCtx) =
std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load());
}};
@@ -178,7 +180,7 @@ bool shouldCollect(const ServiceContext* serviceCtx) {
if (!isTelemetryEnabled(serviceCtx)) {
return false;
}
- // Check if rate limiting allows us to accumulate.
+ // Check if rate limiting allows us to collect telemetry for this request.
if (!telemetryRateLimiter(serviceCtx)->handleRequestSlidingWindow()) {
return false;
}
@@ -217,21 +219,64 @@ void throwIfEncounteringFLEPayload(BSONElement e) {
}
}
+/**
+ * Get the metrics for a given key holding the appropriate locks.
+ */
+class LockedMetrics {
+ LockedMetrics(TelemetryMetrics* metrics,
+ Lock::ResourceLock telemetryStoreReadLock,
+ TelemetryStore::Partition partitionLock)
+ : _metrics(metrics),
+ _telemetryStoreReadLock(std::move(telemetryStoreReadLock)),
+ _partitionLock(std::move(partitionLock)) {}
+
+public:
+ static LockedMetrics get(const OperationContext* opCtx, const BSONObj& telemetryKey) {
+ auto&& [telemetryStore, telemetryStoreReadLock] =
+ getTelemetryStoreForRead(opCtx->getServiceContext());
+ auto&& [statusWithMetrics, partitionLock] =
+ telemetryStore->getWithPartitionLock(telemetryKey);
+ TelemetryMetrics* metrics;
+ if (statusWithMetrics.isOK()) {
+ metrics = statusWithMetrics.getValue();
+ } else {
+ telemetryStore->put(telemetryKey, {}, partitionLock);
+ auto newMetrics = partitionLock->get(telemetryKey);
+ // This can happen if the budget is immediately exceeded. Specifically if the there is
+ // not enough room for a single new entry if the number of partitions is too high
+ // relative to the size.
+ tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
+ metrics = &newMetrics.getValue()->second;
+ }
+ return LockedMetrics{metrics, std::move(telemetryStoreReadLock), std::move(partitionLock)};
+ }
+
+ TelemetryMetrics* operator->() const {
+ return _metrics;
+ }
+
+private:
+ TelemetryMetrics* _metrics;
+
+ Lock::ResourceLock _telemetryStoreReadLock;
+
+ TelemetryStore::Partition _partitionLock;
+};
+
} // namespace
-boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request,
- const OperationContext* opCtx) {
+void registerAggRequest(const AggregateCommandRequest& request, OperationContext* opCtx) {
if (request.getEncryptionInformation()) {
- return {};
+ return;
}
// Queries against metadata collections should never appear in telemetry data.
if (request.getNamespace().isFLE2StateCollection()) {
- return {};
+ return;
}
if (!shouldCollect(opCtx->getServiceContext())) {
- return {};
+ return;
}
BSONObjBuilder telemetryKey;
@@ -252,25 +297,41 @@ boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& r
telemetryKey.append("applicationName", metadata->getApplicationName());
}
} catch (ExceptionFor<ErrorCodes::EncounteredFLEPayloadWhileRedacting>&) {
- return {};
+ return;
}
- return {telemetryKey.obj()};
+ opCtx->storeQueryBSON(telemetryKey.obj());
+ // Management of the telemetry key works as follows.
+ //
+ // Query execution potentially spans more than one request/operation. For this reason, we need a
+ // mechanism to communicate the context (the telemetry key) across operations on the same query.
+ // In order to accomplish this, we store the telemetry key in the plan explainer which exists
+ // for the entire life of the query.
+ //
+ // - Telemetry key must be stored in the OperationContext before the PlanExecutor is created.
+ // This is accomplished by calling registerXXXRequest() in run_aggregate.cpp and
+ // find_cmd.cpp before the PlanExecutor is created.
+ //
+ // - During collectTelemetry(), the telemetry key is retrieved from the OperationContext to
+ // write metrics into the telemetry store. This is done at the end of the operation.
+ //
+ // - Upon getMore() calls, registerGetMoreRequest() copy the telemetry key from the
+ // PlanExplainer to the OperationContext.
}
-boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request,
- const NamespaceString& collection,
- const OperationContext* opCtx) {
+void registerFindRequest(const FindCommandRequest& request,
+ const NamespaceString& collection,
+ OperationContext* opCtx) {
if (request.getEncryptionInformation()) {
- return {};
+ return;
}
// Queries against metadata collections should never appear in telemetry data.
if (collection.isFLE2StateCollection()) {
- return {};
+ return;
}
if (!shouldCollect(opCtx->getServiceContext())) {
- return {};
+ return;
}
BSONObjBuilder telemetryKey;
@@ -293,17 +354,17 @@ boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& reques
telemetryKey.append("applicationName", metadata->getApplicationName());
}
} catch (ExceptionFor<ErrorCodes::EncounteredFLEPayloadWhileRedacting>&) {
- return {};
+ return;
}
- return {telemetryKey.obj()};
+ opCtx->storeQueryBSON(telemetryKey.obj());
}
-boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx,
- const BSONObj& telemetryKey) {
+void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planExplainer) {
+ auto&& telemetryKey = planExplainer.getTelemetryKey();
if (telemetryKey.isEmpty() || !shouldCollect(opCtx->getServiceContext())) {
- return {};
+ return;
}
- return {telemetryKey};
+ opCtx->storeQueryBSON(telemetryKey);
}
std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(
@@ -315,33 +376,25 @@ std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* servic
return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore();
}
-void collectTelemetry(const ServiceContext* serviceCtx,
- const BSONObj& key,
- const OpDebug& opDebug,
- bool isExec) {
- auto&& getTelemetryStoreResult = getTelemetryStoreForRead(serviceCtx);
- auto telemetryStore = getTelemetryStoreResult.first;
- auto&& result = telemetryStore->getWithPartitionLock(key);
- auto statusWithMetrics = result.first;
- auto partitionLock = std::move(result.second);
- auto metrics = [&]() {
- if (statusWithMetrics.isOK()) {
- return statusWithMetrics.getValue();
- } else {
- TelemetryMetrics metrics;
- telemetryStore->put(key, metrics, partitionLock);
- auto newMetrics = partitionLock->get(key);
- // This can happen if the budget is immediately exceeded. Specifically if the there is
- // not enough room for a single new entry if the number of partitions is too high
- // relative to the size.
- tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
- return &newMetrics.getValue()->second;
- }
- }();
- if (isExec) {
- metrics->execCount++;
- metrics->queryOptMicros.aggregate(opDebug.planningTime.count());
+void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle) {
+ if (isFle) {
+ return;
+ }
+ auto&& telemetryKey = opCtx->getTelemetryKey();
+ if (telemetryKey.isEmpty()) {
+ return;
+ }
+ auto&& metrics = LockedMetrics::get(opCtx, telemetryKey);
+ metrics->execCount++;
+ metrics->queryOptMicros.aggregate(opDebug.planningTime.count());
+}
+
+void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug) {
+ auto&& telemetryKey = opCtx->getTelemetryKey();
+ if (telemetryKey.isEmpty()) {
+ return;
}
+ auto&& metrics = LockedMetrics::get(opCtx, telemetryKey);
metrics->docsReturned.aggregate(opDebug.nreturned);
metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0));
metrics->keysScanned.aggregate(opDebug.additiveMetrics.keysExamined.value_or(0));
diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h
index 6fd59884ac4..3a2f41a7e4e 100644
--- a/src/mongo/db/query/telemetry.h
+++ b/src/mongo/db/query/telemetry.h
@@ -163,33 +163,31 @@ std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* servic
bool isTelemetryEnabled(const ServiceContext* serviceCtx);
/**
- * Should we collect telemetry for a request? The decision is made based on the feature flag and
- * telemetry parameters such as rate limiting.
+ * Register a request for telemetry collection. The telemetry machinery may decide not to collect
+ * anything but this should be called for all requests. The decision is made based on the feature
+ * flag and telemetry parameters such as rate limiting.
*
- * If the return value is a telemetry key in the form of BSONObj, this indicates the telemetry
- * should be collected. Otherwise, telemetry should not be collected.
+ * The caller is still responsible for subsequently calling collectTelemetry() once the request is
+ * completed.
*
* Note that calling this affects internal state. It should be called once for each request for
* which telemetry may be collected.
*/
-boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request,
- const OperationContext* opCtx);
+void registerAggRequest(const AggregateCommandRequest& request, OperationContext* opCtx);
-boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request,
- const NamespaceString& collection,
- const OperationContext* opCtx);
+void registerFindRequest(const FindCommandRequest& request,
+ const NamespaceString& collection,
+ OperationContext* opCtx);
-boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx,
- const BSONObj& telemetryKey);
+void registerGetMoreRequest(OperationContext* opCtx, const PlanExplainer& planExplainer);
+
+void recordExecution(const OperationContext* opCtx, const OpDebug& opDebug, bool isFle);
/**
* Collect telemetry for the operation identified by `key`. The `isExec` flag should be set if it's
* the beginning of execution (first batch) of results and not set for subsequent getMore() calls.
*/
-void collectTelemetry(const ServiceContext* serviceCtx,
- const BSONObj& key,
- const OpDebug& opDebug,
- bool isExec);
+void collectTelemetry(const OperationContext* opCtx, const OpDebug& opDebug);
} // namespace telemetry
} // namespace mongo