diff options
22 files changed, 430 insertions, 59 deletions
diff --git a/jstests/noPassthrough/telemetry_cache_metrics.js b/jstests/noPassthrough/telemetry_cache_metrics.js new file mode 100644 index 00000000000..2a261a0cbc6 --- /dev/null +++ b/jstests/noPassthrough/telemetry_cache_metrics.js @@ -0,0 +1,76 @@ +/** + * Test that the telemetry metrics are updated correctly and persist across getMores. + */ +(function() { +"use strict"; + +const conn = MongoRunner.runMongod({}); +const db = conn.getDB('test'); + +var coll = db[jsTestName()]; +var collTwo = db[jsTestName() + 'Two']; +coll.drop(); + +for (var i = 0; i < 100; i++) { + coll.insert({foo: 0}); + coll.insert({foo: 1}); + collTwo.insert({foo: Math.random(0, 1), bar: Math.random(0, 1)}); +} + +function verifyTelemetryMetrics() { + const telStore = db.adminCommand({aggregate: 1, pipeline: [{$telemetry: {}}], cursor: {}}); + // print(tojson(telStore)); + const metrics = telStore.cursor.firstBatch[0].metrics; + print(tojson(metrics)); + assert(metrics.execCount > 0); + assert(metrics.firstSeenTimestamp); + // assert(metrics.lastExecutionMicros > 0); + // assert(metrics.queryOptMicros.sum > 0); + // assert(metrics.queryExecMicros.sum > 0); + // assert(metrics.docsReturned.sum > 0); + // assert(metrics.docsScanned.sum > 0); + // assert(metrics.keysScanned.sum > 0); +} + +let query; + +// agg query +query = { + $setWindowFields: { + sortBy: {_id: 1}, + output: {foo: {$linearFill: "$foo"}}, + } +}; +coll.aggregate([query]); +verifyTelemetryMetrics(); + +// agg query with some stages pushed to find layer. +coll.aggregate([{$match: {foo: 0}}, {$group: {_id: null, count: {$sum: 1}}}]); +verifyTelemetryMetrics(); + +// agg query with all stages pushed to find layer. +coll.aggregate([{$sort: {foo: 1}}]); +verifyTelemetryMetrics(); + +// multiple batches require multiple plan executors. We want to confirm we are only storing the +// metrics for the outer executor associated with planning the query, and not a subsequent executor +// that is constructed when a new operation context gets created during getMore() calls. +// coll.aggregate([{$unionWith: collTwo.getName()}], {cursor: {batchSize: 2}}); +// verifyTelemetryMetrics(); + +// $lookup has inner executor (cursor??), we want to confirm we are only reporting metrics from the +// outer executor associated with planning the query. +coll.aggregate({ + $lookup: {from: collTwo.getName(), localField: "foo", foreignField: "bar", as: "merged_docs"} +}); +verifyTelemetryMetrics(); + +// Count and find have different entry points (eg different run() methods) from agg and we want to +// confirm we are starting the timer as planning begins in each of these workflows/paths. +coll.count({foo: 0}); +verifyTelemetryMetrics(); + +query = coll.findOne({}); +verifyTelemetryMetrics(query); +MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 1658ee2b9de..cde3be5c63c 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -55,6 +55,7 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/query/telemetry.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/service_context.h" @@ -744,6 +745,15 @@ public: metricsCollector.incrementDocUnitsReturned(nss.ns(), docUnitsReturned); query_request_helper::validateCursorResponse(result->getBodyBuilder().asTempObj(), nss.tenantId()); + + auto telemetryKey = + telemetry::shouldCollectTelemetry(originalFC, collection.get()->ns(), opCtx); + if (telemetryKey) { + opCtx->storeQueryBSON(*telemetryKey); + + telemetry::collectTelemetry( + opCtx->getServiceContext(), *telemetryKey, CurOp::get(opCtx)->debug(), true); + } } void appendMirrorableRequest(BSONObjBuilder* bob) const override { diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 02b5739397c..e5f62335112 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -53,6 +53,7 @@ #include "mongo/db/query/getmore_command_gen.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_summary_stats.h" +#include "mongo/db/query/telemetry.h" #include "mongo/db/read_concern.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" @@ -692,6 +693,15 @@ public: // response batch. curOp->debug().nreturned = numResults; + auto telemetryKey = telemetry::shouldCollectTelemetry( + opCtx, exec->getPlanExplainer().getTelemetryKey()); + if (telemetryKey) { + telemetry::collectTelemetry(opCtx->getServiceContext(), + exec->getPlanExplainer().getTelemetryKey(), + curOp->debug(), + false); + } + if (respondWithId) { cursorDeleter.dismiss(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 1e46d2a8894..159fd47923a 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -725,6 +725,7 @@ Status runAggregate(OperationContext* opCtx, boost::intrusive_ptr<ExpressionContext> expCtx; auto curOp = CurOp::get(opCtx); auto catalog = CollectionCatalog::get(opCtx); + boost::optional<BSONObj> telemetryKey; { // If we are in a transaction, check whether the parsed pipeline supports being in @@ -1095,6 +1096,19 @@ Status runAggregate(OperationContext* opCtx, curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; + telemetryKey = telemetry::shouldCollectTelemetry(request, opCtx); + // Build the telemetry key and store it in the operation context + if (telemetryKey) { + // TODO SERVER-71315: should we store it in the CurOp instead? (or even PlanExplainer) + opCtx->storeQueryBSON(*telemetryKey); + } + + + if (telemetryKey) { + telemetry::collectTelemetry( + opCtx->getServiceContext(), *telemetryKey, curOp->debug(), true); + } + // For an optimized away pipeline, signal the cache that a query operation has completed. // For normal pipelines this is done in DocumentSourceCursor. if (ctx) { diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 29db5f6385b..173753c28ec 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -416,6 +416,18 @@ public: // cache. return Microseconds{-1}; } + + BSONObj getTelemetryKey() const { + return _originalQueryBSON; + } + + void storeQueryBSON(BSONObj originalBSON) { + if (_originalQueryBSON.isEmpty()) { + _originalQueryBSON = originalBSON.copy(); + } + } + + /** * Sets the deadline for this operation to the given point in time. * @@ -822,6 +834,7 @@ private: std::unique_ptr<Timer> _planningTimer = nullptr; Microseconds _timeElapsedPlanning; + BSONObj _originalQueryBSON; bool _writesAreReplicated = true; bool _shouldIncrementLatencyStats = true; bool _inMultiDocumentTransaction = false; diff --git a/src/mongo/db/pipeline/document_source_telemetry.h b/src/mongo/db/pipeline/document_source_telemetry.h index d3a7dfd78e2..7d702aeb2b0 100644 --- a/src/mongo/db/pipeline/document_source_telemetry.h +++ b/src/mongo/db/pipeline/document_source_telemetry.h @@ -36,6 +36,8 @@ namespace mongo { +using namespace telemetry; + class DocumentSourceTelemetry final : public DocumentSource { public: static constexpr StringData kStageName = "$telemetry"_sd; @@ -143,13 +145,7 @@ private: */ void waitForEmpty() { stdx::unique_lock lk{_mutex}; - _waitForEmpty.wait(lk, [&] { - try { - return !_queue.tryPop(); - } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueConsumed>&) { - return true; - } - }); + _waitForEmpty.wait(lk, [&] { return _queue.getStats().queueDepth == 0; }); } private: diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index d7f2cc76225..67ed5841fd4 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -47,7 +47,7 @@ PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContex Microseconds timeElapsedPlanning) : _expCtx(std::move(expCtx)), _pipeline(std::move(pipeline)), - _planExplainer{_pipeline.get(), timeElapsedPlanning}, + _planExplainer{_pipeline.get(), timeElapsedPlanning, _expCtx->opCtx->getTelemetryKey()}, _resumableScanType{resumableScanType} { // Pipeline plan executors must always have an ExpressionContext. invariant(_expCtx); diff --git a/src/mongo/db/pipeline/plan_explainer_pipeline.h b/src/mongo/db/pipeline/plan_explainer_pipeline.h index a5c92ea4064..14d654487ba 100644 --- a/src/mongo/db/pipeline/plan_explainer_pipeline.h +++ b/src/mongo/db/pipeline/plan_explainer_pipeline.h @@ -39,8 +39,10 @@ namespace mongo { */ class PlanExplainerPipeline final : public PlanExplainer { public: - PlanExplainerPipeline(const Pipeline* pipeline, Microseconds timeElapsedPlanning) - : PlanExplainer{timeElapsedPlanning}, _pipeline{pipeline} {} + PlanExplainerPipeline(const Pipeline* pipeline, + Microseconds timeElapsedPlanning, + BSONObj telemetryKey) + : PlanExplainer{timeElapsedPlanning, telemetryKey}, _pipeline{pipeline} {} bool isMultiPlan() const final { return false; diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 1c320503176..02890313b21 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -375,9 +375,12 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/concurrency/lock_manager', + "$BUILD_DIR/mongo/rpc/client_metadata", '$BUILD_DIR/mongo/util/processinfo', + 'command_request_response', 'memory_util', 'query_knobs', + 'rate_limiting', ], ) diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index e764578fad4..693be64c962 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -133,7 +133,8 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, _workingSet(std::move(ws)), _qs(std::move(qs)), _root(std::move(rt)), - _planExplainer(plan_explainer_factory::make(_root.get(), timeElapsedPlanning)), + _planExplainer( + plan_explainer_factory::make(_root.get(), timeElapsedPlanning, opCtx->getTelemetryKey())), _mustReturnOwnedBson(returnOwnedBson), _nss(std::move(nss)) { invariant(!_expCtx || _expCtx->opCtx == _opCtx); diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index e6d6be12927..791408aeb9c 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -127,6 +127,7 @@ PlanExecutorSBE::PlanExecutorSBE(OperationContext* opCtx, isMultiPlan, isCachedCandidate, timeElapsedPlanning, + opCtx->getTelemetryKey(), _rootData.debugInfo); } diff --git a/src/mongo/db/query/plan_explainer.h b/src/mongo/db/query/plan_explainer.h index f19d9aad462..3f377da7821 100644 --- a/src/mongo/db/query/plan_explainer.h +++ b/src/mongo/db/query/plan_explainer.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/bson/bsonobj.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/query/classic_plan_cache.h" #include "mongo/db/query/explain_options.h" @@ -58,14 +59,22 @@ public: using PlanStatsDetails = std::pair<BSONObj, boost::optional<PlanSummaryStats>>; PlanExplainer() {} - PlanExplainer(const QuerySolution* solution, Microseconds timeElapsedPlanning) + PlanExplainer(const QuerySolution* solution, + Microseconds timeElapsedPlanning, + BSONObj telemetryKey) : _enumeratorExplainInfo{solution ? solution->_enumeratorExplainInfo : PlanEnumeratorExplainInfo{}}, - _timeElapsedPlanning{timeElapsedPlanning} {} - PlanExplainer(Microseconds timeElapsedPlanning) : _timeElapsedPlanning{timeElapsedPlanning} {} + _timeElapsedPlanning{timeElapsedPlanning}, + _telemetryKey{telemetryKey} {} + PlanExplainer(Microseconds timeElapsedPlanning, BSONObj telemetryKey) + : _timeElapsedPlanning{timeElapsedPlanning}, _telemetryKey{telemetryKey} {} PlanExplainer(const PlanEnumeratorExplainInfo& info) : _enumeratorExplainInfo{info} {} - PlanExplainer(const PlanEnumeratorExplainInfo& info, Microseconds timeElapsedPlanning) - : _enumeratorExplainInfo{info}, _timeElapsedPlanning{timeElapsedPlanning} {} + PlanExplainer(const PlanEnumeratorExplainInfo& info, + Microseconds timeElapsedPlanning, + BSONObj telemetryKey) + : _enumeratorExplainInfo{info}, + _timeElapsedPlanning{timeElapsedPlanning}, + _telemetryKey{telemetryKey} {} virtual ~PlanExplainer() = default; @@ -144,9 +153,16 @@ public: void updateEnumeratorExplainInfo(const PlanEnumeratorExplainInfo& other) { _enumeratorExplainInfo.merge(other); } + Microseconds getTimeElapsedPlanning() const { + return _timeElapsedPlanning; + } + BSONObj getTelemetryKey() const { + return _telemetryKey; + } protected: PlanEnumeratorExplainInfo _enumeratorExplainInfo; Microseconds _timeElapsedPlanning{0}; + BSONObj _telemetryKey; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_explainer_factory.cpp b/src/mongo/db/query/plan_explainer_factory.cpp index 8ca029050db..5eace0d82ac 100644 --- a/src/mongo/db/query/plan_explainer_factory.cpp +++ b/src/mongo/db/query/plan_explainer_factory.cpp @@ -42,8 +42,10 @@ std::unique_ptr<PlanExplainer> make(PlanStage* root) { return std::make_unique<PlanExplainerImpl>(root); } -std::unique_ptr<PlanExplainer> make(PlanStage* root, Microseconds timElapsedPlanning) { - return std::make_unique<PlanExplainerImpl>(root, timElapsedPlanning); +std::unique_ptr<PlanExplainer> make(PlanStage* root, + Microseconds timeElapsedPlanning, + BSONObj telemetryKey) { + return std::make_unique<PlanExplainerImpl>(root, timeElapsedPlanning, telemetryKey); } std::unique_ptr<PlanExplainer> make(PlanStage* root, const PlanEnumeratorExplainInfo& explainInfo) { return std::make_unique<PlanExplainerImpl>(root, explainInfo); @@ -72,7 +74,8 @@ std::unique_ptr<PlanExplainer> make(sbe::PlanStage* root, std::move(rejectedCandidates), isMultiPlan, false, /* isFromPlanCache */ - Microseconds{0}, /* planningTimeElapsed*/ + Microseconds{0}, /* timeElapsedPlanning*/ + BSONObj(), debugInfoSBE); } @@ -85,6 +88,7 @@ std::unique_ptr<PlanExplainer> make( bool isMultiPlan, bool isFromPlanCache, Microseconds timeElapsedPlanning, + BSONObj telemetryKey, std::shared_ptr<const plan_cache_debug_info::DebugInfoSBE> debugInfoSBE) { // TODO SERVER-64882: Consider invariant(debugInfoSBE) as we may not need to create a // DebugInfoSBE from QuerySolution after the feature flag is removed. We currently need it @@ -102,6 +106,7 @@ std::unique_ptr<PlanExplainer> make( isMultiPlan, isFromPlanCache, timeElapsedPlanning, + telemetryKey, debugInfoSBE); } } // namespace mongo::plan_explainer_factory diff --git a/src/mongo/db/query/plan_explainer_factory.h b/src/mongo/db/query/plan_explainer_factory.h index 3c51811a3fd..d804666a8d0 100644 --- a/src/mongo/db/query/plan_explainer_factory.h +++ b/src/mongo/db/query/plan_explainer_factory.h @@ -40,7 +40,9 @@ namespace mongo::plan_explainer_factory { std::unique_ptr<PlanExplainer> make(PlanStage* root); -std::unique_ptr<PlanExplainer> make(PlanStage* root, Microseconds timeElapsedPlanning); +std::unique_ptr<PlanExplainer> make(PlanStage* root, + Microseconds timeElapsedPlanning, + BSONObj telemetryKey); std::unique_ptr<PlanExplainer> make(PlanStage* root, const PlanEnumeratorExplainInfo& enumeratorInfo); @@ -65,5 +67,6 @@ std::unique_ptr<PlanExplainer> make( bool isMultiPlan, bool isFromPlanCache, Microseconds timeElapsedPlanning, + BSONObj telemetryKey, std::shared_ptr<const plan_cache_debug_info::DebugInfoSBE> debugInfo); } // namespace mongo::plan_explainer_factory diff --git a/src/mongo/db/query/plan_explainer_impl.h b/src/mongo/db/query/plan_explainer_impl.h index 60cf650fb7f..cf7264d6217 100644 --- a/src/mongo/db/query/plan_explainer_impl.h +++ b/src/mongo/db/query/plan_explainer_impl.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/bson/bsonobj.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/query/plan_enumerator_explain_info.h" #include "mongo/db/query/plan_explainer.h" @@ -47,12 +48,13 @@ class PlanExplainerImpl final : public PlanExplainer { public: PlanExplainerImpl(PlanStage* root, const PlanEnumeratorExplainInfo& explainInfo, - Microseconds timeElapsedPlanning) - : PlanExplainer{explainInfo, timeElapsedPlanning}, _root{root} {} + Microseconds timeElapsedPlanning, + BSONObj telemetryKey) + : PlanExplainer{explainInfo, timeElapsedPlanning, telemetryKey}, _root{root} {} PlanExplainerImpl(PlanStage* root, const PlanEnumeratorExplainInfo& explainInfo) : PlanExplainer{explainInfo}, _root{root} {} - PlanExplainerImpl(PlanStage* root, Microseconds timeElapsedPlanning) - : PlanExplainer{timeElapsedPlanning}, _root{root} {} + PlanExplainerImpl(PlanStage* root, Microseconds timeElapsedPlanning, BSONObj telemetryKey) + : PlanExplainer{timeElapsedPlanning, telemetryKey}, _root{root} {} PlanExplainerImpl(PlanStage* root) : _root{root} {} const ExplainVersion& getVersion() const final; bool isMultiPlan() const final; diff --git a/src/mongo/db/query/plan_explainer_sbe.h b/src/mongo/db/query/plan_explainer_sbe.h index 28b696f3c75..be145591518 100644 --- a/src/mongo/db/query/plan_explainer_sbe.h +++ b/src/mongo/db/query/plan_explainer_sbe.h @@ -51,8 +51,9 @@ public: bool isMultiPlan, bool isCachedPlan, Microseconds timeElapsedPlanning, + BSONObj telemetryKey, std::shared_ptr<const plan_cache_debug_info::DebugInfoSBE> debugInfo) - : PlanExplainer{solution, timeElapsedPlanning}, + : PlanExplainer{solution, timeElapsedPlanning, telemetryKey}, _root{root}, _rootData{data}, _solution{solution}, diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl index 0b743fc94a3..a4cdc297567 100644 --- a/src/mongo/db/query/query_knobs.idl +++ b/src/mongo/db/query/query_knobs.idl @@ -952,12 +952,12 @@ server_parameters: internalQueryConfigureTelemetrySamplingRate: description: "The maximum number of queries per second that are sampled for query telemetry. If the rate of queries goes above this number, then rate limiting will kick in, and any - further queries will not be sampled. The default is INT_MAX, effectively meaning that all - queries will be sampled. This can be set to 0 to turn telemetry off completely." + further queries will not be sampled. To sample all queries, this can be set to the INT_MAX. + This can be set to 0 to turn telemetry off completely." set_at: [ startup, runtime ] cpp_varname: "queryTelemetrySamplingRate" cpp_vartype: AtomicWord<int> - default: 2147483647 + default: 0 internalQueryConfigureTelemetryCacheSize: description: "The maximum amount of memory that the system will allocate for the query telemetry diff --git a/src/mongo/db/query/rate_limiting.h b/src/mongo/db/query/rate_limiting.h index 959ac52692e..1083562ae06 100644 --- a/src/mongo/db/query/rate_limiting.h +++ b/src/mongo/db/query/rate_limiting.h @@ -50,6 +50,13 @@ public: RateLimiting(RequestCount samplingRate, Milliseconds timePeriod = Seconds{1}); /* + * Getter for the sampling rate. + */ + RequestCount getSamplingRate() { + return _samplingRate; + } + + /* * A simple method for rate limiting. Returns false if we have reached the request limit for the * current time window; otherwise, returns true and adds the request to the count for the * current window. If we have passed the end of the previous window, the slate is wiped clean. diff --git a/src/mongo/db/query/sbe_cached_solution_planner.cpp b/src/mongo/db/query/sbe_cached_solution_planner.cpp index 2d43a763c83..fd3bb312c8a 100644 --- a/src/mongo/db/query/sbe_cached_solution_planner.cpp +++ b/src/mongo/db/query/sbe_cached_solution_planner.cpp @@ -129,13 +129,13 @@ CandidatePlans CachedSolutionPlanner::plan( candidate.root.get(), &candidate.data, candidate.solution.get(), - {}, /* optimizedData */ - {}, /* rejectedCandidates */ - false, /* isMultiPlan */ - true, /* isFromPlanCache */ - _opCtx->getElapsedQueryPlanningTime() /* metric stored in PlanExplainer via PlanExecutor + {}, /* optimizedData */ + {}, /* rejectedCandidates */ + false, /* isMultiPlan */ + true, /* isFromPlanCache */ + _opCtx->getElapsedQueryPlanningTime(), /* metric stored in PlanExplainer via PlanExecutor construction*/ - , + _opCtx->getTelemetryKey(), candidate.data.debugInfo ? std::make_unique<plan_cache_debug_info::DebugInfoSBE>(*candidate.data.debugInfo) : nullptr); diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index 468ebb6a396..e0ed2c4018e 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -29,18 +29,30 @@ #include "mongo/db/query/telemetry.h" +#include "mongo/crypto/hash_block.h" +#include "mongo/crypto/sha256_block.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/locker.h" +#include "mongo/db/curop.h" +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/query/find_command_gen.h" +#include "mongo/db/query/plan_explainer.h" #include "mongo/db/query/query_planner_params.h" +#include "mongo/db/query/rate_limiting.h" +#include "mongo/db/query/telemetry_util.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/metadata/client_metadata.h" #include "mongo/util/processinfo.h" +#include "mongo/util/system_clock_source.h" #include <cstddef> #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery namespace mongo { +namespace telemetry { + namespace { /** @@ -49,8 +61,9 @@ namespace { * * - Updating the telemetry store uses the `getTelemetryStore()` method. The telemetry store * instance is obtained, entries are looked up and mutated, or created anew. - * - The telemetry store is "reset". This involves atomically allocating a new instance, once there - * are no more updaters (readers of the store "pointer"), and returning the existing instance. + * - The telemetry store is "reset". This involves atomically allocating a new instance, once + * there are no more updaters (readers of the store "pointer"), and returning the existing + * instance. */ class TelemetryStoreManager { public: @@ -63,8 +76,8 @@ public: /** * Acquire the instance of the telemetry store. The telemetry store is mutable and a shared - * "read lock" is obtained on the instance. That is, the telemetry store instance will not be - * replaced. + * "read lock" is obtained on the instance. That is, the telemetry store instance will not + * be replaced. */ std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStore() { return std::make_pair(&*_telemetryStore, Lock::SharedLock{&_instanceLock, _instanceMutex}); @@ -104,8 +117,8 @@ public: auto cappedSize = memory_util::capMemorySize( newSizeBytes /*requestedSize*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); - /* If capped size is less than requested size, the telemetry store has been capped at its - * upper limit*/ + /* If capped size is less than requested size, the telemetry store has been capped at + * its upper limit*/ if (cappedSize < newSizeBytes) { LOGV2_DEBUG(7106503, 1, @@ -118,6 +131,8 @@ public: } }; +const auto telemetryRateLimiter = + ServiceContext::declareDecoration<std::unique_ptr<RateLimiting>>(); ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ "TelemetryStoreManagerRegisterer", [](ServiceContext* serviceCtx) { @@ -128,8 +143,8 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ auto size = memory_util::getRequestedMemSizeInBytes(status.getValue()); auto cappedStoreSize = memory_util::capMemorySize( size /*requestedSizeBytes*/, 1 /*maximumSizeGB*/, 25 /*percentTotalSystemMemory*/); - /* If capped size is less than requested size, the telemetry store has been capped at its - * upper limit*/ + // If capped size is less than requested size, the telemetry store has been capped at its + // upper limit. if (cappedStoreSize < size) { LOGV2_DEBUG(7106502, 1, @@ -137,19 +152,159 @@ ServiceContext::ConstructorActionRegisterer telemetryStoreManagerRegisterer{ "cappedSize"_attr = cappedStoreSize); } auto&& globalTelemetryStoreManager = telemetryStoreDecoration(serviceCtx); - globalTelemetryStoreManager = std::make_unique<TelemetryStoreManager>( - serviceCtx, cappedStoreSize, ProcessInfo::getNumCores()); + const int kNumPartitions = 100; // the more the merrier. + globalTelemetryStoreManager = + std::make_unique<TelemetryStoreManager>(serviceCtx, cappedStoreSize, kNumPartitions); + // TODO there will be a rate limiter initialized somewhere, and we can get the value from + // there to save a .load(). We need the rate limiter to do rate limiting here anyway. int + // samplingRate = queryTelemetrySamplingRate.load(); Quick escape if it's turned off? if + // (!samplingRate) { + // return; + //} + telemetryRateLimiter(serviceCtx) = + std::make_unique<RateLimiting>(queryTelemetrySamplingRate.load()); }}; +bool isTelemetryEnabled(const ServiceContext* serviceCtx) { + return telemetryRateLimiter(serviceCtx)->getSamplingRate() > 0; +} + +/** + * Internal check for whether we should collect metrics. This checks the rate limiting + * configuration for a global on/off decision and, if enabled, delegates to the rate limiter. + */ +bool shouldCollect(const ServiceContext* serviceCtx) { + // Quick escape if telemetry is turned off. + if (!isTelemetryEnabled(serviceCtx)) { + return false; + } + // Check if rate limiting allows us to accumulate. + if (!telemetryRateLimiter(serviceCtx)->handleRequestSlidingWindow()) { + return false; + } + // TODO SERVER-71244: check if it's a FLE collection here (maybe pass in the request) + return true; +} + +/** + * Add a field to the find op's telemetry key. The `value` will be redacted. + */ +void addToFindKey(BSONObjBuilder& builder, const StringData& fieldName, const BSONObj& value) { + serializeBSONWhenNotEmpty(value.redact(false), fieldName, &builder); +} + } // namespace +boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request, + const OperationContext* opCtx) { + if (request.getEncryptionInformation()) { + return {}; + } + + if (!shouldCollect(opCtx->getServiceContext())) { + return {}; + } + + BSONObjBuilder telemetryKey; + BSONObjBuilder pipelineBuilder = telemetryKey.subarrayStart("pipeline"_sd); + for (auto&& stage : request.getPipeline()) { + auto el = stage.firstElement(); + BSONObjBuilder stageBuilder = pipelineBuilder.subobjStart("stage"_sd); + stageBuilder.append(el.fieldNameStringData(), el.Obj().redact(false)); + stageBuilder.done(); + } + pipelineBuilder.done(); + telemetryKey.append("namespace", request.getNamespace().toString()); + if (request.getReadConcern()) { + telemetryKey.append("readConcern", *request.getReadConcern()); + } + if (auto metadata = ClientMetadata::get(opCtx->getClient())) { + telemetryKey.append("applicationName", metadata->getApplicationName()); + } + return {telemetryKey.obj()}; +} + +boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request, + const NamespaceString& collection, + const OperationContext* opCtx) { + if (request.getEncryptionInformation()) { + return {}; + } + + if (!shouldCollect(opCtx->getServiceContext())) { + return {}; + } + + BSONObjBuilder telemetryKey; + BSONObjBuilder findBuilder = telemetryKey.subobjStart("find"_sd); + auto findBson = request.toBSON({}); + for (auto&& findEntry : findBson) { + if (findEntry.isABSONObj()) { + telemetryKey.append(findEntry.fieldNameStringData(), findEntry.Obj().redact(false)); + } else { + telemetryKey.append(findEntry.fieldNameStringData(), "###"_sd); + } + } + findBuilder.done(); + telemetryKey.append("namespace", collection.toString()); + if (request.getReadConcern()) { + telemetryKey.append("readConcern", *request.getReadConcern()); + } + if (auto metadata = ClientMetadata::get(opCtx->getClient())) { + telemetryKey.append("applicationName", metadata->getApplicationName()); + } + return {telemetryKey.obj()}; +} + +boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx, + const BSONObj& telemetryKey) { + if (telemetryKey.isEmpty() || !shouldCollect(opCtx->getServiceContext())) { + return {}; + } + return {telemetryKey}; +} + std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead( - ServiceContext* serviceCtx) { + const ServiceContext* serviceCtx) { return telemetryStoreDecoration(serviceCtx)->getTelemetryStore(); } -std::unique_ptr<TelemetryStore> resetTelemetryStore(ServiceContext* serviceCtx) { +std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx) { return telemetryStoreDecoration(serviceCtx)->resetTelemetryStore(); } +void collectTelemetry(const ServiceContext* serviceCtx, + const BSONObj& key, + const OpDebug& opDebug, + bool isExec) { + auto&& getTelemetryStoreResult = getTelemetryStoreForRead(serviceCtx); + auto telemetryStore = getTelemetryStoreResult.first; + auto&& result = telemetryStore->getWithPartitionLock(key); + auto statusWithMetrics = result.first; + auto partitionLock = std::move(result.second); + auto metrics = [&]() { + if (statusWithMetrics.isOK()) { + return statusWithMetrics.getValue(); + } else { + TelemetryMetrics metrics; + telemetryStore->put(key, metrics, partitionLock); + auto newMetrics = partitionLock->get(key); + // This can happen if the budget is immediately exceeded. Specifically if the there is + // not enough room for a single new entry if the number of partitions is too high + // relative to the size. + tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); + return &newMetrics.getValue()->second; + } + }(); + if (isExec) { + metrics->execCount++; + metrics->queryOptMicros.aggregate(opDebug.planningTime.count()); + } + metrics->docsReturned.aggregate(opDebug.nreturned); + metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0)); + metrics->keysScanned.aggregate(opDebug.additiveMetrics.keysExamined.value_or(0)); + metrics->lastExecutionMicros = opDebug.executionTime.count(); + metrics->queryExecMicros.aggregate(opDebug.executionTime.count()); +} +} // namespace telemetry } // namespace mongo diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h index 1e60df54e04..6fd59884ac4 100644 --- a/src/mongo/db/query/telemetry.h +++ b/src/mongo/db/query/telemetry.h @@ -31,17 +31,26 @@ #include "mongo/base/status.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/curop.h" #include "mongo/db/query/partitioned_cache.h" +#include "mongo/db/query/plan_explainer.h" #include "mongo/db/query/util/memory_util.h" #include "mongo/db/service_context.h" namespace mongo { +class OpDebug; +class AggregateCommandRequest; +class FindCommandRequest; + +namespace { /** * Type we use to render values to BSON. */ using BSONNumeric = long long; +} // namespace +namespace telemetry { /** * An aggregated metric stores a compressed view of data. It balances the loss of information with * the reduction in required storage. @@ -58,13 +67,19 @@ struct AggregatedMetric { sumOfSquares += val * val; } - BSONObj toBSON() const { - return BSON("sum" << (BSONNumeric)sum << "max" << (BSONNumeric)sum << "min" - << (BSONNumeric)sum << "sumOfSquares" << (BSONNumeric)sum); + void appendTo(BSONObjBuilder& builder, const StringData& fieldName) const { + BSONObjBuilder metricsBuilder = builder.subobjStart(fieldName); + metricsBuilder.append("sum", (BSONNumeric)sum); + metricsBuilder.append("max", (BSONNumeric)max); + metricsBuilder.append("min", (BSONNumeric)min); + metricsBuilder.append("sumOfSquares", (BSONNumeric)sumOfSquares); + metricsBuilder.done(); } uint64_t sum = 0; - uint64_t min = 0; + // Default to the _signed_ maximum (which fits in unsigned range) because we cast to BSONNumeric + // when serializing. + uint64_t min = (uint64_t)std::numeric_limits<int64_t>::max; uint64_t max = 0; /** @@ -76,19 +91,27 @@ struct AggregatedMetric { class TelemetryMetrics { public: + TelemetryMetrics() : firstSeenTimestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0) {} + BSONObj toBSON() const { BSONObjBuilder builder{sizeof(TelemetryMetrics) + 100}; builder.append("lastExecutionMicros", (BSONNumeric)lastExecutionMicros); builder.append("execCount", (BSONNumeric)execCount); - builder.append("queryOptTime", queryOptMicros.toBSON()); - builder.append("queryExecMicros", queryExecMicros.toBSON()); - builder.append("docsReturned", docsReturned.toBSON()); - builder.append("docsScanned", docsScanned.toBSON()); - builder.append("keysScanned", keysScanned.toBSON()); + queryOptMicros.appendTo(builder, "queryOptMicros"); + queryExecMicros.appendTo(builder, "queryExecMicros"); + docsReturned.appendTo(builder, "docsReturned"); + docsScanned.appendTo(builder, "docsScanned"); + keysScanned.appendTo(builder, "keysScanned"); + builder.append("firstSeenTimestamp", firstSeenTimestamp); return builder.obj(); } /** + * Timestamp for when this query shape was added to the store. Set on construction. + */ + const Timestamp firstSeenTimestamp; + + /** * Last execution time in microseconds. */ uint64_t lastExecutionMicros = 0; @@ -132,8 +155,41 @@ using TelemetryStore = PartitionedCache<BSONObj, /** * Acquire a reference to the global telemetry store. */ -std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead(ServiceContext* serviceCtx); +std::pair<TelemetryStore*, Lock::ResourceLock> getTelemetryStoreForRead( + const ServiceContext* serviceCtx); + +std::unique_ptr<TelemetryStore> resetTelemetryStore(const ServiceContext* serviceCtx); -std::unique_ptr<TelemetryStore> resetTelemetryStore(ServiceContext* serviceCtx); +bool isTelemetryEnabled(const ServiceContext* serviceCtx); + +/** + * Should we collect telemetry for a request? The decision is made based on the feature flag and + * telemetry parameters such as rate limiting. + * + * If the return value is a telemetry key in the form of BSONObj, this indicates the telemetry + * should be collected. Otherwise, telemetry should not be collected. + * + * Note that calling this affects internal state. It should be called once for each request for + * which telemetry may be collected. + */ +boost::optional<BSONObj> shouldCollectTelemetry(const AggregateCommandRequest& request, + const OperationContext* opCtx); + +boost::optional<BSONObj> shouldCollectTelemetry(const FindCommandRequest& request, + const NamespaceString& collection, + const OperationContext* opCtx); + +boost::optional<BSONObj> shouldCollectTelemetry(const OperationContext* opCtx, + const BSONObj& telemetryKey); + +/** + * Collect telemetry for the operation identified by `key`. The `isExec` flag should be set if it's + * the beginning of execution (first batch) of results and not set for subsequent getMore() calls. + */ +void collectTelemetry(const ServiceContext* serviceCtx, + const BSONObj& key, + const OpDebug& opDebug, + bool isExec); +} // namespace telemetry } // namespace mongo diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp index 4edc6e1ec88..c05a6fb0aac 100644 --- a/src/mongo/db/query/telemetry_store_test.cpp +++ b/src/mongo/db/query/telemetry_store_test.cpp @@ -32,7 +32,7 @@ #include "mongo/db/query/telemetry.h" #include "mongo/unittest/unittest.h" -namespace mongo { +namespace mongo::telemetry { class TelemetryStoreTest : public unittest::Test { protected: @@ -96,4 +96,4 @@ TEST_F(TelemetryStoreTest, BasicUsage) { ASSERT_EQ(numKeys, 2); } -} // namespace mongo +} // namespace mongo::telemetry |