summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJennifer Peshansky <jennifer.peshansky@mongodb.com>2022-11-07 20:54:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-14 21:50:26 +0000
commitd5613886915a0507b9bb8d716f63e3e4323ca17f (patch)
tree40337d9ef7103b8f542eab737dbe0d378919cef9
parent32ced838bccafe3a8274c7eb732af4ce337a7cc4 (diff)
downloadmongo-d5613886915a0507b9bb8d716f63e3e4323ca17f.tar.gz
SERVER-70647 Add entries to telemetry cache for find and getMore requests
-rw-r--r--jstests/noPassthrough/telemetry_cache_metrics.js76
-rw-r--r--src/mongo/db/commands/find_cmd.cpp10
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp10
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
-rw-r--r--src/mongo/db/operation_context.h13
-rw-r--r--src/mongo/db/pipeline/document_source_telemetry.h10
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp2
-rw-r--r--src/mongo/db/pipeline/plan_explainer_pipeline.h6
-rw-r--r--src/mongo/db/query/SConscript3
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp3
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp1
-rw-r--r--src/mongo/db/query/plan_explainer.h26
-rw-r--r--src/mongo/db/query/plan_explainer_factory.cpp11
-rw-r--r--src/mongo/db/query/plan_explainer_factory.h5
-rw-r--r--src/mongo/db/query/plan_explainer_impl.h10
-rw-r--r--src/mongo/db/query/plan_explainer_sbe.h3
-rw-r--r--src/mongo/db/query/query_knobs.idl6
-rw-r--r--src/mongo/db/query/rate_limiting.h7
-rw-r--r--src/mongo/db/query/sbe_cached_solution_planner.cpp12
-rw-r--r--src/mongo/db/query/telemetry.cpp179
-rw-r--r--src/mongo/db/query/telemetry.h78
-rw-r--r--src/mongo/db/query/telemetry_store_test.cpp4
22 files changed, 430 insertions, 59 deletions
diff --git a/jstests/noPassthrough/telemetry_cache_metrics.js b/jstests/noPassthrough/telemetry_cache_metrics.js
new file mode 100644
index 00000000000..2a261a0cbc6
--- /dev/null
+++ b/jstests/noPassthrough/telemetry_cache_metrics.js
@@ -0,0 +1,76 @@
+/**
+ * Test that the telemetry metrics are updated correctly and persist across getMores.
+ */
+(function() {
+"use strict";
+
+const conn = MongoRunner.runMongod({});
+const db = conn.getDB('test');
+
+var coll = db[jsTestName()];
+var collTwo = db[jsTestName() + 'Two'];
+coll.drop();
+
+for (var i = 0; i < 100; i++) {
+ coll.insert({foo: 0});
+ coll.insert({foo: 1});
+ collTwo.insert({foo: Math.random(0, 1), bar: Math.random(0, 1)});
+}
+
+function verifyTelemetryMetrics() {
+ const telStore = db.adminCommand({aggregate: 1, pipeline: [{$telemetry: {}}], cursor: {}});
+ // print(tojson(telStore));
+ const metrics = telStore.cursor.firstBatch[0].metrics;
+ print(tojson(metrics));
+ assert(metrics.execCount > 0);
+ assert(metrics.firstSeenTimestamp);
+ // assert(metrics.lastExecutionMicros > 0);
+ // assert(metrics.queryOptMicros.sum > 0);
+ // assert(metrics.queryExecMicros.sum > 0);
+ // assert(metrics.docsReturned.sum > 0);
+ // assert(metrics.docsScanned.sum > 0);
+ // assert(metrics.keysScanned.sum > 0);
+}
+
+let query;
+
+// agg query
+query = {
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {foo: {$linearFill: "$foo"}},
+ }
+};
+coll.aggregate([query]);
+verifyTelemetryMetrics();
+
+// agg query with some stages pushed to find layer.
+coll.aggregate([{$match: {foo: 0}}, {$group: {_id: null, count: {$sum: 1}}}]);
+verifyTelemetryMetrics();
+
+// agg query with all stages pushed to find layer.
+coll.aggregate([{$sort: {foo: 1}}]);
+verifyTelemetryMetrics();
+
+// multiple batches require multiple plan executors. We want to confirm we are only storing the
+// metrics for the outer executor associated with planning the query, and not a subsequent executor
+// that is constructed when a new operation context gets created during getMore() calls.
+// coll.aggregate([{$unionWith: collTwo.getName()}], {cursor: {batchSize: 2}});
+// verifyTelemetryMetrics();
+
+// $lookup has inner executor (cursor??), we want to confirm we are only reporting metrics from the
+// outer executor associated with planning the query.
+coll.aggregate({
+ $lookup: {from: collTwo.getName(), localField: "foo", foreignField: "bar", as: "merged_docs"}
+});
+verifyTelemetryMetrics();
+
+// Count and find have different entry points (eg different run() methods) from agg and we want to
+// confirm we are starting the timer as planning begins in each of these workflows/paths.
+coll.count({foo: 0});
+verifyTelemetryMetrics();
+
+query = coll.findOne({});
+verifyTelemetryMetrics(query);
+MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 1658ee2b9de..cde3be5c63c 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/query_knobs_gen.h"
+#include "mongo/db/query/telemetry.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/service_context.h"
@@ -744,6 +745,15 @@ public:
metricsCollector.incrementDocUnitsReturned(nss.ns(), docUnitsReturned);
query_request_helper::validateCursorResponse(result->getBodyBuilder().asTempObj(),
nss.tenantId());
+
+ auto telemetryKey =
+ telemetry::shouldCollectTelemetry(originalFC, collection.get()->ns(), opCtx);
+ if (telemetryKey) {
+ opCtx->storeQueryBSON(*telemetryKey);
+
+ telemetry::collectTelemetry(
+ opCtx->getServiceContext(), *telemetryKey, CurOp::get(opCtx)->debug(), true);
+ }
}
void appendMirrorableRequest(BSONObjBuilder* bob) const override {
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 02b5739397c..e5f62335112 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/query/getmore_command_gen.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/query/telemetry.h"
#include "mongo/db/read_concern.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -692,6 +693,15 @@ public:
// response batch.
curOp->debug().nreturned = numResults;
+ auto telemetryKey = telemetry::shouldCollectTelemetry(
+ opCtx, exec->getPlanExplainer().getTelemetryKey());
+ if (telemetryKey) {
+ telemetry::collectTelemetry(opCtx->getServiceContext(),
+ exec->getPlanExplainer().getTelemetryKey(),
+ curOp->debug(),
+ false);
+ }
+
if (respondWithId) {
cursorDeleter.dismiss();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 1e46d2a8894..159fd47923a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -725,6 +725,7 @@ Status runAggregate(OperationContext* opCtx,
boost::intrusive_ptr<ExpressionContext> expCtx;
auto curOp = CurOp::get(opCtx);
auto catalog = CollectionCatalog::get(opCtx);
+ boost::optional<BSONObj> telemetryKey;
{
// If we are in a transaction, check whether the parsed pipeline supports being in
@@ -1095,6 +1096,19 @@ Status runAggregate(OperationContext* opCtx,
curOp->debug().setPlanSummaryMetrics(stats);
curOp->debug().nreturned = stats.nReturned;
+ telemetryKey = telemetry::shouldCollectTelemetry(request, opCtx);
+ // Build the telemetry key and store it in the operation context
+ if (telemetryKey) {
+ // TODO SERVER-71315: should we store it in the CurOp instead? (or even PlanExplainer)
+ opCtx->storeQueryBSON(*telemetryKey);
+ }
+
+
+ if (telemetryKey) {
+ telemetry::collectTelemetry(
+ opCtx->getServiceContext(), *telemetryKey, curOp->debug(), true);
+ }
+
// For an optimized away pipeline, signal the cache that a query operation has completed.
// For normal pipelines this is done in DocumentSourceCursor.
if (ctx) {
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 29db5f6385b..173753c28ec 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -416,6 +416,18 @@ public:
// cache.
return Microseconds{-1};
}
+
+ BSONObj getTelemetryKey() const {
+ return _originalQueryBSON;
+ }
+
+ void storeQueryBSON(BSONObj originalBSON) {
+ if (_originalQueryBSON.isEmpty()) {
+ _originalQueryBSON = originalBSON.copy();
+ }
+ }
+
+
/**
* Sets the deadline for this operation to the given point in time.
*
@@ -822,6 +834,7 @@ private:
std::unique_ptr<Timer> _planningTimer = nullptr;
Microseconds _timeElapsedPlanning;
+ BSONObj _originalQueryBSON;
bool _writesAreReplicated = true;
bool _shouldIncrementLatencyStats = true;
bool _inMultiDocumentTransaction = false;
diff --git a/src/mongo/db/pipeline/document_source_telemetry.h b/src/mongo/db/pipeline/document_source_telemetry.h
index d3a7dfd78e2..7d702aeb2b0 100644
--- a/src/mongo/db/pipeline/document_source_telemetry.h
+++ b/src/mongo/db/pipeline/document_source_telemetry.h
@@ -36,6 +36,8 @@
namespace mongo {
+using namespace telemetry;
+
class DocumentSourceTelemetry final : public DocumentSource {
public:
static constexpr StringData kStageName = "$telemetry"_sd;
@@ -143,13 +145,7 @@ private:
*/
void waitForEmpty() {
stdx::unique_lock lk{_mutex};
- _waitForEmpty.wait(lk, [&] {
- try {
- return !_queue.tryPop();
- } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueConsumed>&) {
- return true;
- }
- });
+ _waitForEmpty.wait(lk, [&] { return _queue.getStats().queueDepth == 0; });
}
private:
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index d7f2cc76225..67ed5841fd4 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -47,7 +47,7 @@ PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContex
Microseconds timeElapsedPlanning)
: _expCtx(std::move(expCtx)),
_pipeline(std::move(pipeline)),
- _planExplainer{_pipeline.get(), timeElapsedPlanning},
+ _planExplainer{_pipeline.get(), timeElapsedPlanning, _expCtx->opCtx->getTelemetryKey()},
_resumableScanType{resumableScanType} {
// Pipeline plan executors must always have an ExpressionContext.
invariant(_expCtx);
diff --git a/src/mongo/db/pipeline/plan_explainer_pipeline.h b/src/mongo/db/pipeline/plan_explainer_pipeline.h
index a5c92ea4064..14d654487ba 100644
--- a/src/mongo/db/pipeline/plan_explainer_pipeline.h
+++ b/src/mongo/db/pipeline/plan_explainer_pipeline.h
@@ -39,8 +39,10 @@ namespace mongo {
*/
class PlanExplainerPipeline final : public PlanExplainer {
public:
- PlanExplainerPipeline(const Pipeline* pipeline, Microseconds timeElapsedPlanning)
- : PlanExplainer{timeElapsedPlanning}, _pipeline{pipeline} {}
+ PlanExplainerPipeline(const Pipeline* pipeline,
+ Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey)
+ : PlanExplainer{timeElapsedPlanning, telemetryKey}, _pipeline{pipeline} {}
bool isMultiPlan() const final {
return false;
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 1c320503176..02890313b21 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -375,9 +375,12 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ "$BUILD_DIR/mongo/rpc/client_metadata",
'$BUILD_DIR/mongo/util/processinfo',
+ 'command_request_response',
'memory_util',
'query_knobs',
+ 'rate_limiting',
],
)
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index e764578fad4..693be64c962 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -133,7 +133,8 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx,
_workingSet(std::move(ws)),
_qs(std::move(qs)),
_root(std::move(rt)),
- _planExplainer(plan_explainer_factory::make(_root.get(), timeElapsedPlanning)),
+ _planExplainer(
+ plan_explainer_factory::make(_root.get(), timeElapsedPlanning, opCtx->getTelemetryKey())),
_mustReturnOwnedBson(returnOwnedBson),
_nss(std::move(nss)) {
invariant(!_expCtx || _expCtx->opCtx == _opCtx);
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index e6d6be12927..791408aeb9c 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -127,6 +127,7 @@ PlanExecutorSBE::PlanExecutorSBE(OperationContext* opCtx,
isMultiPlan,
isCachedCandidate,
timeElapsedPlanning,
+ opCtx->getTelemetryKey(),
_rootData.debugInfo);
}
diff --git a/src/mongo/db/query/plan_explainer.h b/src/mongo/db/query/plan_explainer.h
index f19d9aad462..3f377da7821 100644
--- a/src/mongo/db/query/plan_explainer.h
+++ b/src/mongo/db/query/plan_explainer.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/bson/bsonobj.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/query/classic_plan_cache.h"
#include "mongo/db/query/explain_options.h"
@@ -58,14 +59,22 @@ public:
using PlanStatsDetails = std::pair<BSONObj, boost::optional<PlanSummaryStats>>;
PlanExplainer() {}
- PlanExplainer(const QuerySolution* solution, Microseconds timeElapsedPlanning)
+ PlanExplainer(const QuerySolution* solution,
+ Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey)
: _enumeratorExplainInfo{solution ? solution->_enumeratorExplainInfo
: PlanEnumeratorExplainInfo{}},
- _timeElapsedPlanning{timeElapsedPlanning} {}
- PlanExplainer(Microseconds timeElapsedPlanning) : _timeElapsedPlanning{timeElapsedPlanning} {}
+ _timeElapsedPlanning{timeElapsedPlanning},
+ _telemetryKey{telemetryKey} {}
+ PlanExplainer(Microseconds timeElapsedPlanning, BSONObj telemetryKey)
+ : _timeElapsedPlanning{timeElapsedPlanning}, _telemetryKey{telemetryKey} {}
PlanExplainer(const PlanEnumeratorExplainInfo& info) : _enumeratorExplainInfo{info} {}
- PlanExplainer(const PlanEnumeratorExplainInfo& info, Microseconds timeElapsedPlanning)
- : _enumeratorExplainInfo{info}, _timeElapsedPlanning{timeElapsedPlanning} {}
+ PlanExplainer(const PlanEnumeratorExplainInfo& info,
+ Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey)
+ : _enumeratorExplainInfo{info},
+ _timeElapsedPlanning{timeElapsedPlanning},
+ _telemetryKey{telemetryKey} {}
virtual ~PlanExplainer() = default;
@@ -144,9 +153,16 @@ public:
void updateEnumeratorExplainInfo(const PlanEnumeratorExplainInfo& other) {
_enumeratorExplainInfo.merge(other);
}
+ Microseconds getTimeElapsedPlanning() const {
+ return _timeElapsedPlanning;
+ }
+ BSONObj getTelemetryKey() const {
+ return _telemetryKey;
+ }
protected:
PlanEnumeratorExplainInfo _enumeratorExplainInfo;
Microseconds _timeElapsedPlanning{0};
+ BSONObj _telemetryKey;
};
} // namespace mongo
diff --git a/src/mongo/db/query/plan_explainer_factory.cpp b/src/mongo/db/query/plan_explainer_factory.cpp
index 8ca029050db..5eace0d82ac 100644
--- a/src/mongo/db/query/plan_explainer_factory.cpp
+++ b/src/mongo/db/query/plan_explainer_factory.cpp
@@ -42,8 +42,10 @@ std::unique_ptr<PlanExplainer> make(PlanStage* root) {
return std::make_unique<PlanExplainerImpl>(root);
}
-std::unique_ptr<PlanExplainer> make(PlanStage* root, Microseconds timElapsedPlanning) {
- return std::make_unique<PlanExplainerImpl>(root, timElapsedPlanning);
+std::unique_ptr<PlanExplainer> make(PlanStage* root,
+ Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey) {
+ return std::make_unique<PlanExplainerImpl>(root, timeElapsedPlanning, telemetryKey);
}
std::unique_ptr<PlanExplainer> make(PlanStage* root, const PlanEnumeratorExplainInfo& explainInfo) {
return std::make_unique<PlanExplainerImpl>(root, explainInfo);
@@ -72,7 +74,8 @@ std::unique_ptr<PlanExplainer> make(sbe::PlanStage* root,
std::move(rejectedCandidates),
isMultiPlan,
false, /* isFromPlanCache */
- Microseconds{0}, /* planningTimeElapsed*/
+ Microseconds{0}, /* timeElapsedPlanning*/
+ BSONObj(),
debugInfoSBE);
}
@@ -85,6 +88,7 @@ std::unique_ptr<PlanExplainer> make(
bool isMultiPlan,
bool isFromPlanCache,
Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey,
std::shared_ptr<const plan_cache_debug_info::DebugInfoSBE> debugInfoSBE) {
// TODO SERVER-64882: Consider invariant(debugInfoSBE) as we may not need to create a
// DebugInfoSBE from QuerySolution after the feature flag is removed. We currently need it
@@ -102,6 +106,7 @@ std::unique_ptr<PlanExplainer> make(
isMultiPlan,
isFromPlanCache,
timeElapsedPlanning,
+ telemetryKey,
debugInfoSBE);
}
} // namespace mongo::plan_explainer_factory
diff --git a/src/mongo/db/query/plan_explainer_factory.h b/src/mongo/db/query/plan_explainer_factory.h
index 3c51811a3fd..d804666a8d0 100644
--- a/src/mongo/db/query/plan_explainer_factory.h
+++ b/src/mongo/db/query/plan_explainer_factory.h
@@ -40,7 +40,9 @@
namespace mongo::plan_explainer_factory {
std::unique_ptr<PlanExplainer> make(PlanStage* root);
-std::unique_ptr<PlanExplainer> make(PlanStage* root, Microseconds timeElapsedPlanning);
+std::unique_ptr<PlanExplainer> make(PlanStage* root,
+ Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey);
std::unique_ptr<PlanExplainer> make(PlanStage* root,
const PlanEnumeratorExplainInfo& enumeratorInfo);
@@ -65,5 +67,6 @@ std::unique_ptr<PlanExplainer> make(
bool isMultiPlan,
bool isFromPlanCache,
Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey,
std::shared_ptr<const plan_cache_debug_info::DebugInfoSBE> debugInfo);
} // namespace mongo::plan_explainer_factory
diff --git a/src/mongo/db/query/plan_explainer_impl.h b/src/mongo/db/query/plan_explainer_impl.h
index 60cf650fb7f..cf7264d6217 100644
--- a/src/mongo/db/query/plan_explainer_impl.h
+++ b/src/mongo/db/query/plan_explainer_impl.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/bson/bsonobj.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/query/plan_enumerator_explain_info.h"
#include "mongo/db/query/plan_explainer.h"
@@ -47,12 +48,13 @@ class PlanExplainerImpl final : public PlanExplainer {
public:
PlanExplainerImpl(PlanStage* root,
const PlanEnumeratorExplainInfo& explainInfo,
- Microseconds timeElapsedPlanning)
- : PlanExplainer{explainInfo, timeElapsedPlanning}, _root{root} {}
+ Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey)
+ : PlanExplainer{explainInfo, timeElapsedPlanning, telemetryKey}, _root{root} {}
PlanExplainerImpl(PlanStage* root, const PlanEnumeratorExplainInfo& explainInfo)
: PlanExplainer{explainInfo}, _root{root} {}
- PlanExplainerImpl(PlanStage* root, Microseconds timeElapsedPlanning)
- : PlanExplainer{timeElapsedPlanning}, _root{root} {}
+ PlanExplainerImpl(PlanStage* root, Microseconds timeElapsedPlanning, BSONObj telemetryKey)
+ : PlanExplainer{timeElapsedPlanning, telemetryKey}, _root{root} {}
PlanExplainerImpl(PlanStage* root) : _root{root} {}
const ExplainVersion& getVersion() const final;
bool isMultiPlan() const final;
diff --git a/src/mongo/db/query/plan_explainer_sbe.h b/src/mongo/db/query/plan_explainer_sbe.h
index 28b696f3c75..be145591518 100644
--- a/src/mongo/db/query/plan_explainer_sbe.h
+++ b/src/mongo/db/query/plan_explainer_sbe.h
@@ -51,8 +51,9 @@ public:
bool isMultiPlan,
bool isCachedPlan,
Microseconds timeElapsedPlanning,
+ BSONObj telemetryKey,
std::shared_ptr<const plan_cache_debug_info::DebugInfoSBE> debugInfo)
- : PlanExplainer{solution, timeElapsedPlanning},
+ : PlanExplainer{solution, timeElapsedPlanning, telemetryKey},
_root{root},
_rootData{data},
_solution{solution},
diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl
index 0b743fc94a3..a4cdc297567 100644
--- a/src/mongo/db/query/query_knobs.idl
+++ b/src/mongo/db/query/query_knobs.idl
@@ -952,12 +952,12 @@ server_parameters:
internalQueryConfigureTelemetrySamplingRate:
description: "The maximum number of queries per second that are sampled for query telemetry.
If the rate of queries goes above this number, then rate limiting will kick in, and any
- further queries will not be sampled. The default is INT_MAX, effectively meaning that all
- queries will be sampled. This can be set to 0 to turn telemetry off completely."
+ further queries will not be sampled. To sample all queries, this can be set to the INT_MAX.
+ This can be set to 0 to turn telemetry off completely."
set_at: [ startup, runtime ]
cpp_varname: "queryTelemetrySamplingRate"
cpp_vartype: AtomicWord<int>
- default: 2147483647
+ default: 0
internalQueryConfigureTelemetryCacheSize:
description: "The maximum amount of memory that the system will allocate for the query telemetry
diff --git a/src/mongo/db/query/rate_limiting.h b/src/mongo/db/query/rate_limiting.h
index 959ac52692e..1083562ae06 100644
--- a/src/mongo/db/query/rate_limiting.h
+++ b/src/mongo/db/query/rate_limiting.h
@@ -50,6 +50,13 @@ public:
RateLimiting(RequestCount samplingRate, Milliseconds timePeriod = Seconds{1});
/*
+ * Getter for the sampling rate.
+ */
+ RequestCount getSamplingRate() {
+ return _samplingRate;
+ }
+
+ /*
* A simple method for rate limiting. Returns false if we have reached the request limit for the
* current time window; otherwise, returns true and adds the request to the count for the
* current window. If we have passed the end of the previous window, the slate is wiped clean.
diff --git a/src/mongo/db/query/sbe_cached_solution_planner.cpp b/src/mongo/db/query/sbe_cached_solution_planner.cpp
index 2d43a763c83..fd3bb312c8a 100644
--- a/src/mongo/db/query/sbe_cached_solution_planner.cpp
+++ b/src/mongo/db/query/sbe_cached_solution_planner.cpp
@@ -129,13 +129,13 @@ CandidatePlans CachedSolutionPlanner::plan(
candidate.root.get(),
&candidate.data,
candidate.solution.get(),
- {}, /* optimizedData */
- {}, /* rejectedCandidates */
- false, /* isMultiPlan */
- true, /* isFromPlanCache */
- _opCtx->getElapsedQueryPlanningTime() /* metric stored in PlanExplainer via PlanExecutor
+ {}, /* optimizedData */
+ {}, /* rejectedCandidates */
+ false, /* isMultiPlan */
+ true, /* isFromPlanCache */
+ _opCtx->getElapsedQueryPlanningTime(), /* metric stored in PlanExplainer via PlanExecutor
construction*/
- ,
+ _opCtx->getTelemetryKey(),
candidate.data.debugInfo
? std::make_unique<plan_cache_debug_info::DebugInfoSBE>(*candidate.data.debugInfo)
: nullptr);
diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp
index 468ebb6a396..e0ed2c4018e 100644
--- a/src/mongo/db/query/telemetry.cpp
+++ b/src/mongo/db/query/telemetry.cpp
@@ -29,18 +29,30 @@
#include "mongo/db/query/telemetry.h"
+#include "mongo/crypto/hash_block.h"
+#include "mongo/crypto/sha256_block.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/locker.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/query/find_command_gen.h"
+#include "mongo/db/query/plan_explainer.h"
#include "mongo/db/query/query_planner_params.h"
+#include "mongo/db/query/rate_limiting.h"
+#include "mongo/db/query/telemetry_util.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/util/processinfo.h"
+#include "mongo/util/system_clock_source.h"
#include <cstddef>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
namespace mongo {
+namespace telemetry {
+
namespace {
/**
@@ -49,8 +61,9 @@ namespace {
*
* - Updating the telemetry store uses the `getTelemetryStore()` method. The telemetry store
* instance is obtained, entries are looked up and mutated, or created anew.
- * - The telemetry store is "reset". This involves atomically allocating a new instance, once there
- * are no more updaters (readers of the store "pointer"), and returning the existing instance.
+ * - The telemetry store is "reset". This involves atomically allocating a new instance, once
+ * there are no more updaters (readers of the store "pointer"), and returning the existing
+ * instance.
*/
class TelemetryStoreManager {
public:
@@ -63,8 +76,8 @@ public:
/**
* Acquire the instance of the telemetry store. The telemetry store is mutable and a shared
- * "read lock" is obtained on the instance. That is, the telemetry store instance will not be
- * replaced.
+ * "read lock" is obtained on the instance. That is, the telemetry store instance will not
+ * be replaced.
*/
std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStore() {
return std::make_pair(&*_telemetryStore, Lock::SharedLock{&_instanceLock, _instanceMutex});
@@ -104,8 +117,8 @@ public:
auto cappedSize = memory_util::capMemorySize(
newSizeBytes /*requestedSize*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
- /* If capped size is less than requested size, the telemetry store has been capped at its
- * upper limit*/
+ /* If capped size is less than requested size, the telemetry store has been capped at
+ * its upper limit*/
if (cappedSize < newSizeBytes) {
LOGV2_DEBUG(7106503,
1,
@@ -118,6 +131,8 @@ public:
}
};
+const auto telemetryRateLimiter =
+ ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>();
ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
"TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) {
@@ -128,8 +143,8 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
auto size = memory_util::getRequestedMemSizeInBytes(status.getValue());
auto cappedStoreSize = memory_util::capMemorySize(
size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/);
- /* If capped size is less than requested size, the telemetry store has been capped at its
- * upper limit*/
+ // If capped size is less than requested size, the telemetry store has been capped at its
+ // upper limit.
if (cappedStoreSize < size) {
LOGV2_DEBUG(7106502,
1,
@@ -137,19 +152,159 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{
"cappedSize"_attr = cappedStoreSize);
}
auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx);
- globalTelemetryStoreManager = std::make_unique<TelemetryStoreManager>(
- serviceCtx, cappedStoreSize, ProcessInfo::getNumCores());
+ const int kNumPartitions = 100; // the more the merrier.
+ globalTelemetryStoreManager =
+ std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, kNumPartitions);
+ // TODO there will be a rate limiter initialized somewhere, and we can get the value from
+ // there to save a .load(). We need the rate limiter to do rate limiting here anyway. int
+ // samplingRate = queryTelemetrySamplingRate.load(); Quick escape if it's turned off? if
+ // (!samplingRate) {
+ // return;
+ //}
+ telemetryRateLimiter(serviceCtx) =
+ std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load());
}};
+bool isTelemetryEnabled(const ServiceContext* serviceCtx) {
+ return telemetryRateLimiter(serviceCtx)->getSamplingRate() > 0;
+}
+
+/**
+ * Internal check for whether we should collect metrics. This checks the rate limiting
+ * configuration for a global on/off decision and, if enabled, delegates to the rate limiter.
+ */
+bool shouldCollect(const ServiceContext* serviceCtx) {
+ // Quick escape if telemetry is turned off.
+ if (!isTelemetryEnabled(serviceCtx)) {
+ return false;
+ }
+ // Check if rate limiting allows us to accumulate.
+ if (!telemetryRateLimiter(serviceCtx)->handleRequestSlidingWindow()) {
+ return false;
+ }
+ // TODO SERVER-71244: check if it's a FLE collection here (maybe pass in the request)
+ return true;
+}
+
+/**
+ * Add a field to the find op's telemetry key. The `value` will be redacted.
+ */
+void addToFindKey(BSONObjBuilder& builder, const StringData& fieldName, const BSONObj& value) {
+ serializeBSONWhenNotEmpty(value.redact(false), fieldName, &builder);
+}
+
} // namespace
+boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request,
+ const OperationContext* opCtx) {
+ if (request.getEncryptionInformation()) {
+ return {};
+ }
+
+ if (!shouldCollect(opCtx->getServiceContext())) {
+ return {};
+ }
+
+ BSONObjBuilder telemetryKey;
+ BSONObjBuilder pipelineBuilder = telemetryKey.subarrayStart("pipeline"_sd);
+ for (auto&& stage : request.getPipeline()) {
+ auto el = stage.firstElement();
+ BSONObjBuilder stageBuilder = pipelineBuilder.subobjStart("stage"_sd);
+ stageBuilder.append(el.fieldNameStringData(), el.Obj().redact(false));
+ stageBuilder.done();
+ }
+ pipelineBuilder.done();
+ telemetryKey.append("namespace", request.getNamespace().toString());
+ if (request.getReadConcern()) {
+ telemetryKey.append("readConcern", *request.getReadConcern());
+ }
+ if (auto metadata = ClientMetadata::get(opCtx->getClient())) {
+ telemetryKey.append("applicationName", metadata->getApplicationName());
+ }
+ return {telemetryKey.obj()};
+}
+
+boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request,
+ const NamespaceString& collection,
+ const OperationContext* opCtx) {
+ if (request.getEncryptionInformation()) {
+ return {};
+ }
+
+ if (!shouldCollect(opCtx->getServiceContext())) {
+ return {};
+ }
+
+ BSONObjBuilder telemetryKey;
+ BSONObjBuilder findBuilder = telemetryKey.subobjStart("find"_sd);
+ auto findBson = request.toBSON({});
+ for (auto&& findEntry : findBson) {
+ if (findEntry.isABSONObj()) {
+ telemetryKey.append(findEntry.fieldNameStringData(), findEntry.Obj().redact(false));
+ } else {
+ telemetryKey.append(findEntry.fieldNameStringData(), "###"_sd);
+ }
+ }
+ findBuilder.done();
+ telemetryKey.append("namespace", collection.toString());
+ if (request.getReadConcern()) {
+ telemetryKey.append("readConcern", *request.getReadConcern());
+ }
+ if (auto metadata = ClientMetadata::get(opCtx->getClient())) {
+ telemetryKey.append("applicationName", metadata->getApplicationName());
+ }
+ return {telemetryKey.obj()};
+}
+
+boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx,
+ const BSONObj& telemetryKey) {
+ if (telemetryKey.isEmpty() || !shouldCollect(opCtx->getServiceContext())) {
+ return {};
+ }
+ return {telemetryKey};
+}
+
std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(
- ServiceContext* serviceCtx) {
+ const ServiceContext* serviceCtx) {
return telemetryStoreDecoration(serviceCtx)->getTelemetryStore();
}
-std::unique_ptr<TelemetryStore> resetTelemetryStore(ServiceContext* serviceCtx) {
+std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx) {
return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore();
}
+void collectTelemetry(const ServiceContext* serviceCtx,
+ const BSONObj& key,
+ const OpDebug& opDebug,
+ bool isExec) {
+ auto&& getTelemetryStoreResult = getTelemetryStoreForRead(serviceCtx);
+ auto telemetryStore = getTelemetryStoreResult.first;
+ auto&& result = telemetryStore->getWithPartitionLock(key);
+ auto statusWithMetrics = result.first;
+ auto partitionLock = std::move(result.second);
+ auto metrics = [&]() {
+ if (statusWithMetrics.isOK()) {
+ return statusWithMetrics.getValue();
+ } else {
+ TelemetryMetrics metrics;
+ telemetryStore->put(key, metrics, partitionLock);
+ auto newMetrics = partitionLock->get(key);
+ // This can happen if the budget is immediately exceeded. Specifically if the there is
+ // not enough room for a single new entry if the number of partitions is too high
+ // relative to the size.
+ tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
+ return &newMetrics.getValue()->second;
+ }
+ }();
+ if (isExec) {
+ metrics->execCount++;
+ metrics->queryOptMicros.aggregate(opDebug.planningTime.count());
+ }
+ metrics->docsReturned.aggregate(opDebug.nreturned);
+ metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0));
+ metrics->keysScanned.aggregate(opDebug.additiveMetrics.keysExamined.value_or(0));
+ metrics->lastExecutionMicros = opDebug.executionTime.count();
+ metrics->queryExecMicros.aggregate(opDebug.executionTime.count());
+}
+} // namespace telemetry
} // namespace mongo
diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h
index 1e60df54e04..6fd59884ac4 100644
--- a/src/mongo/db/query/telemetry.h
+++ b/src/mongo/db/query/telemetry.h
@@ -31,17 +31,26 @@
#include "mongo/base/status.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/curop.h"
#include "mongo/db/query/partitioned_cache.h"
+#include "mongo/db/query/plan_explainer.h"
#include "mongo/db/query/util/memory_util.h"
#include "mongo/db/service_context.h"
namespace mongo {
+class OpDebug;
+class AggregateCommandRequest;
+class FindCommandRequest;
+
+namespace {
/**
* Type we use to render values to BSON.
*/
using BSONNumeric = long long;
+} // namespace
+namespace telemetry {
/**
* An aggregated metric stores a compressed view of data. It balances the loss of information with
* the reduction in required storage.
@@ -58,13 +67,19 @@ struct AggregatedMetric {
sumOfSquares += val * val;
}
- BSONObj toBSON() const {
- return BSON("sum" << (BSONNumeric)sum << "max" << (BSONNumeric)sum << "min"
- << (BSONNumeric)sum << "sumOfSquares" << (BSONNumeric)sum);
+ void appendTo(BSONObjBuilder& builder, const StringData& fieldName) const {
+ BSONObjBuilder metricsBuilder = builder.subobjStart(fieldName);
+ metricsBuilder.append("sum", (BSONNumeric)sum);
+ metricsBuilder.append("max", (BSONNumeric)max);
+ metricsBuilder.append("min", (BSONNumeric)min);
+ metricsBuilder.append("sumOfSquares", (BSONNumeric)sumOfSquares);
+ metricsBuilder.done();
}
uint64_t sum = 0;
- uint64_t min = 0;
+ // Default to the _signed_ maximum (which fits in unsigned range) because we cast to BSONNumeric
+ // when serializing.
+ uint64_t min = (uint64_t)std::numeric_limits<int64_t>::max;
uint64_t max = 0;
/**
@@ -76,19 +91,27 @@ struct AggregatedMetric {
class TelemetryMetrics {
public:
+ TelemetryMetrics() : firstSeenTimestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0) {}
+
BSONObj toBSON() const {
BSONObjBuilder builder{sizeof(TelemetryMetrics) + 100};
builder.append("lastExecutionMicros", (BSONNumeric)lastExecutionMicros);
builder.append("execCount", (BSONNumeric)execCount);
- builder.append("queryOptTime", queryOptMicros.toBSON());
- builder.append("queryExecMicros", queryExecMicros.toBSON());
- builder.append("docsReturned", docsReturned.toBSON());
- builder.append("docsScanned", docsScanned.toBSON());
- builder.append("keysScanned", keysScanned.toBSON());
+ queryOptMicros.appendTo(builder, "queryOptMicros");
+ queryExecMicros.appendTo(builder, "queryExecMicros");
+ docsReturned.appendTo(builder, "docsReturned");
+ docsScanned.appendTo(builder, "docsScanned");
+ keysScanned.appendTo(builder, "keysScanned");
+ builder.append("firstSeenTimestamp", firstSeenTimestamp);
return builder.obj();
}
/**
+ * Timestamp for when this query shape was added to the store. Set on construction.
+ */
+ const Timestamp firstSeenTimestamp;
+
+ /**
* Last execution time in microseconds.
*/
uint64_t lastExecutionMicros = 0;
@@ -132,8 +155,41 @@ using TelemetryStore = PartitionedCache<BSONObj,
/**
* Acquire a reference to the global telemetry store.
*/
-std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(ServiceContext* serviceCtx);
+std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(
+ const ServiceContext* serviceCtx);
+
+std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx);
-std::unique_ptr<TelemetryStore> resetTelemetryStore(ServiceContext* serviceCtx);
+bool isTelemetryEnabled(const ServiceContext* serviceCtx);
+
+/**
+ * Should we collect telemetry for a request? The decision is made based on the feature flag and
+ * telemetry parameters such as rate limiting.
+ *
+ * If the return value is a telemetry key in the form of BSONObj, this indicates the telemetry
+ * should be collected. Otherwise, telemetry should not be collected.
+ *
+ * Note that calling this affects internal state. It should be called once for each request for
+ * which telemetry may be collected.
+ */
+boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request,
+ const OperationContext* opCtx);
+
+boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request,
+ const NamespaceString& collection,
+ const OperationContext* opCtx);
+
+boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx,
+ const BSONObj& telemetryKey);
+
+/**
+ * Collect telemetry for the operation identified by `key`. The `isExec` flag should be set if it's
+ * the beginning of execution (first batch) of results and not set for subsequent getMore() calls.
+ */
+void collectTelemetry(const ServiceContext* serviceCtx,
+ const BSONObj& key,
+ const OpDebug& opDebug,
+ bool isExec);
+} // namespace telemetry
} // namespace mongo
diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp
index 4edc6e1ec88..c05a6fb0aac 100644
--- a/src/mongo/db/query/telemetry_store_test.cpp
+++ b/src/mongo/db/query/telemetry_store_test.cpp
@@ -32,7 +32,7 @@
#include "mongo/db/query/telemetry.h"
#include "mongo/unittest/unittest.h"
-namespace mongo {
+namespace mongo::telemetry {
class TelemetryStoreTest : public unittest::Test {
protected:
@@ -96,4 +96,4 @@ TEST_F(TelemetryStoreTest, BasicUsage) {
ASSERT_EQ(numKeys, 2);
}
-} // namespace mongo
+} // namespace mongo::telemetry