diff options
author | Davis Haupt <davis.haupt@mongodb.com> | 2023-02-07 14:28:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-07 15:11:49 +0000 |
commit | 5b1a7a5cbe024b7bb123abb91020eb9997120259 (patch) | |
tree | 5b1e0d91349e93707b28167a5b51b0e3768e8482 | |
parent | 713c60157fea0cab9da39ff90a8d0a8eb6583a1b (diff) | |
download | mongo-5b1a7a5cbe024b7bb123abb91020eb9997120259.tar.gz |
SERVER-73407 Collect telemetry on mongos for find and aggr6.3.0-rc0
-rw-r--r-- | jstests/noPassthrough/telemetry_collect_on_mongos.js | 142 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_cmd.h | 10 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 3 |
4 files changed, 160 insertions, 1 deletions
diff --git a/jstests/noPassthrough/telemetry_collect_on_mongos.js b/jstests/noPassthrough/telemetry_collect_on_mongos.js new file mode 100644 index 00000000000..51546adc0de --- /dev/null +++ b/jstests/noPassthrough/telemetry_collect_on_mongos.js @@ -0,0 +1,142 @@ +/** + * Test that mongos is collecting metrics. + */ + +(function() { +"use strict"; + +const st = new ShardingTest({ + mongos: 1, + shards: 1, + config: 1, + rs: {nodes: 1}, + mongosOptions: { + setParameter: { + internalQueryConfigureTelemetrySamplingRate: 2147483647, + featureFlagTelemetry: true, + } + }, +}); + +// Redacted literal replacement string. This may change in the future, so it's factored out. +const R = "###"; + +const mongos = st.s; +const db = mongos.getDB("test"); +const coll = db.coll; +coll.insert({v: 1}); +coll.insert({v: 4}); + +// Get the telemetry for a given database, filtering out the actual $telemetry call. +const getTelemetry = (conn) => { + const result = conn.adminCommand({ + aggregate: 1, + pipeline: [ + {$telemetry: {}}, + // Sort on telemetry key so entries are in a deterministic order. + {$sort: {key: 1}}, + ], + cursor: {} + }); + return result.cursor.firstBatch; +}; + +/** + * Verify that mongos is recording these metrics: + * - "firstSeenTimestamp" + * - "lastExecutionMicros" + * - "execCount" + * - "queryExecMicros" + * - "docsReturned" + */ + +// This test can't predict exact timings, so just assert these three fields have been set (are +// non-zero). +const assertTelemetryMetricsSet = (metrics) => { + const {firstSeenTimestamp, lastExecutionMicros, queryExecMicros} = metrics; + + assert.neq(timestampCmp(firstSeenTimestamp, Timestamp(0, 0)), 0); + assert.neq(lastExecutionMicros, NumberLong(0)); + + const distributionFields = ['sum', 'max', 'min', 'sumOfSquares']; + for (const field of distributionFields) { + assert.neq(queryExecMicros[field], NumberLong(0)); + } +}; + +coll.find({v: {$gt: 0, $lt: 5}}).toArray(); +coll.find({v: {$gt: 2, $lt: 3}}).toArray(); +coll.find({v: {$gt: 0, $lt: 1}}).toArray(); +coll.find({v: {$gt: 0, $lt: 2}}).toArray(); + +{ + const telemetry = getTelemetry(db); + assert.eq(1, telemetry.length); + const {key, metrics} = telemetry[0]; + const {docsReturned, execCount} = metrics; + assert.eq(4, execCount); + assert.eq( + { + find: { + find: R, + filter: {v: {$gt: R, $lt: R}}, + readConcern: {level: R, provenance: R}, + } + }, + key, + ); + assert.eq({ + sum: NumberLong(3), + max: NumberLong(2), + min: NumberLong(0), + sumOfSquares: NumberLong(5), + }, + docsReturned); + assertTelemetryMetricsSet(metrics); +} + +coll.aggregate([ + {$match: {v: {$gt: 0, $lt: 5}}}, + {$project: {hello: "$world"}}, +]); +coll.aggregate([ + {$match: {v: {$gt: 0, $lt: 5}}}, + {$project: {hello: "$world"}}, +]); +coll.aggregate([ + {$match: {v: {$gt: 2, $lt: 3}}}, + {$project: {hello: "$universe"}}, +]); +coll.aggregate([ + {$match: {v: {$gt: 0, $lt: 2}}}, + {$project: {hello: "$galaxy"}}, +]); + +{ + const telemetry = getTelemetry(mongos.getDB("test")); + assert.eq(3, telemetry.length); // The $telemetry query for the last test is included in this + // call to $telemetry. + const {key, metrics} = telemetry[1]; + const {docsReturned, execCount} = metrics; + assert.eq(4, execCount); + assert.eq({ + sum: NumberLong(5), + max: NumberLong(2), + min: NumberLong(0), + sumOfSquares: NumberLong(9), + }, + docsReturned); + assert.eq({ + pipeline: [ + {$match: {v: {$gt: R, $lt: R}}}, + {$project: {hello: R}}, + ], + namespace: "test.coll", + applicationName: "MongoDB Shell" + }, + key); + assertTelemetryMetricsSet(metrics); +} + +st.stop(); +}()); diff --git a/src/mongo/s/commands/cluster_find_cmd.h b/src/mongo/s/commands/cluster_find_cmd.h index 360b2e6ce1b..52b1149e0fe 100644 --- a/src/mongo/s/commands/cluster_find_cmd.h +++ b/src/mongo/s/commands/cluster_find_cmd.h @@ -38,6 +38,7 @@ #include "mongo/db/fle_crud.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/telemetry.h" #include "mongo/db/stats/counters.h" #include "mongo/db/views/resolved_view.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -224,6 +225,10 @@ public: ExtensionsCallbackNoop(), MatchExpressionParser::kAllowAllSpecialFeatures)); + if (!_didDoFLERewrite) { + telemetry::registerFindRequest(cq->getFindCommandRequest(), cq->nss(), opCtx); + } + try { // Do the work to generate the first batch of results. This blocks waiting to get // responses from the shard(s). @@ -245,6 +250,7 @@ public: } firstBatch.setPartialResultsReturned(partialResultsReturned); firstBatch.done(cursorId, cq->nss()); + telemetry::recordExecution(opCtx, _didDoFLERewrite); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { result->reset(); @@ -277,7 +283,7 @@ public: * were supplied with the command, and sets the constant runtime values that will be * forwarded to each shard. */ - static std::unique_ptr<FindCommandRequest> _parseCmdObjectToFindCommandRequest( + std::unique_ptr<FindCommandRequest> _parseCmdObjectToFindCommandRequest( OperationContext* opCtx, NamespaceString nss, BSONObj cmdObj) { auto findCommand = query_request_helper::makeFromFindCommand( std::move(cmdObj), @@ -301,6 +307,7 @@ public: invariant(findCommand->getNamespaceOrUUID().nss()); processFLEFindS( opCtx, findCommand->getNamespaceOrUUID().nss().get(), findCommand.get()); + _didDoFLERewrite = true; } return findCommand; @@ -308,6 +315,7 @@ public: const OpMsgRequest& _request; const DatabaseName _dbName; + bool _didDoFLERewrite{false}; }; }; diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 5550b177258..5537e4effb3 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -55,6 +55,7 @@ #include "mongo/db/query/explain_common.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/fle/server_rewrite.h" +#include "mongo/db/query/telemetry.h" #include "mongo/db/timeseries/timeseries_gen.h" #include "mongo/db/timeseries/timeseries_options.h" #include "mongo/db/views/resolved_view.h" @@ -331,6 +332,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto shouldDoFLERewrite = ::mongo::shouldDoFLERewrite(request); auto startsWithDocuments = liteParsedPipeline.startsWithDocuments(); + if (!shouldDoFLERewrite) { + telemetry::registerAggRequest(request, opCtx); + } + // If the routing table is not already taken by the higher level, fill it now. if (!cm) { // If the routing table is valid, we obtain a reference to it. If the table is not valid, @@ -553,6 +558,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, updateHostsTargetedMetrics(opCtx, namespaces.executionNss, cm, involvedNamespaces); // Report usage statistics for each stage in the pipeline. liteParsedPipeline.tickGlobalStageCounters(); + telemetry::recordExecution(opCtx, shouldDoFLERewrite); // Add 'command' object to explain output. if (expCtx->explain) { diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 47f11cf4e29..2709b80f648 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -53,6 +53,7 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_command_gen.h" #include "mongo/db/query/query_planner_common.h" +#include "mongo/db/query/telemetry.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/platform/overflow_arithmetic.h" @@ -849,6 +850,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitWithPinnedCursorDuringGetMoreBatch"); } + telemetry::registerGetMoreRequest(opCtx); + while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { StatusWith<ClusterQueryResult> next = Status{ErrorCodes::InternalError, "uninitialized cluster query result"}; |