summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavis Haupt <davis.haupt@mongodb.com>2023-02-07 14:28:27 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-07 15:11:49 +0000
commit5b1a7a5cbe024b7bb123abb91020eb9997120259 (patch)
tree5b1e0d91349e93707b28167a5b51b0e3768e8482
parent713c60157fea0cab9da39ff90a8d0a8eb6583a1b (diff)
downloadmongo-5b1a7a5cbe024b7bb123abb91020eb9997120259.tar.gz
SERVER-73407 Collect telemetry on mongos for find and aggr6.3.0-rc0
-rw-r--r--jstests/noPassthrough/telemetry_collect_on_mongos.js142
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.h10
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp6
-rw-r--r--src/mongo/s/query/cluster_find.cpp3
4 files changed, 160 insertions, 1 deletions
diff --git a/jstests/noPassthrough/telemetry_collect_on_mongos.js b/jstests/noPassthrough/telemetry_collect_on_mongos.js
new file mode 100644
index 00000000000..51546adc0de
--- /dev/null
+++ b/jstests/noPassthrough/telemetry_collect_on_mongos.js
@@ -0,0 +1,142 @@
+/**
+ * Test that mongos is collecting metrics.
+ */
+
+(function() {
+"use strict";
+
+const st = new ShardingTest({
+ mongos: 1,
+ shards: 1,
+ config: 1,
+ rs: {nodes: 1},
+ mongosOptions: {
+ setParameter: {
+ internalQueryConfigureTelemetrySamplingRate: 2147483647,
+ featureFlagTelemetry: true,
+ }
+ },
+});
+
+// Redacted literal replacement string. This may change in the future, so it's factored out.
+const R = "###";
+
+const mongos = st.s;
+const db = mongos.getDB("test");
+const coll = db.coll;
+coll.insert({v: 1});
+coll.insert({v: 4});
+
+// Get the telemetry for a given database, filtering out the actual $telemetry call.
+const getTelemetry = (conn) => {
+ const result = conn.adminCommand({
+ aggregate: 1,
+ pipeline: [
+ {$telemetry: {}},
+ // Sort on telemetry key so entries are in a deterministic order.
+ {$sort: {key: 1}},
+ ],
+ cursor: {}
+ });
+ return result.cursor.firstBatch;
+};
+
+/**
+ * Verify that mongos is recording these metrics:
+ * - "firstSeenTimestamp"
+ * - "lastExecutionMicros"
+ * - "execCount"
+ * - "queryExecMicros"
+ * - "docsReturned"
+ */
+
+// This test can't predict exact timings, so just assert these three fields have been set (are
+// non-zero).
+const assertTelemetryMetricsSet = (metrics) => {
+ const {firstSeenTimestamp, lastExecutionMicros, queryExecMicros} = metrics;
+
+ assert.neq(timestampCmp(firstSeenTimestamp, Timestamp(0, 0)), 0);
+ assert.neq(lastExecutionMicros, NumberLong(0));
+
+ const distributionFields = ['sum', 'max', 'min', 'sumOfSquares'];
+ for (const field of distributionFields) {
+ assert.neq(queryExecMicros[field], NumberLong(0));
+ }
+};
+
+coll.find({v: {$gt: 0, $lt: 5}}).toArray();
+coll.find({v: {$gt: 2, $lt: 3}}).toArray();
+coll.find({v: {$gt: 0, $lt: 1}}).toArray();
+coll.find({v: {$gt: 0, $lt: 2}}).toArray();
+
+{
+ const telemetry = getTelemetry(db);
+ assert.eq(1, telemetry.length);
+ const {key, metrics} = telemetry[0];
+ const {docsReturned, execCount} = metrics;
+ assert.eq(4, execCount);
+ assert.eq(
+ {
+ find: {
+ find: R,
+ filter: {v: {$gt: R, $lt: R}},
+ readConcern: {level: R, provenance: R},
+ }
+ },
+ key,
+ );
+ assert.eq({
+ sum: NumberLong(3),
+ max: NumberLong(2),
+ min: NumberLong(0),
+ sumOfSquares: NumberLong(5),
+ },
+ docsReturned);
+ assertTelemetryMetricsSet(metrics);
+}
+
+coll.aggregate([
+ {$match: {v: {$gt: 0, $lt: 5}}},
+ {$project: {hello: "$world"}},
+]);
+coll.aggregate([
+ {$match: {v: {$gt: 0, $lt: 5}}},
+ {$project: {hello: "$world"}},
+]);
+coll.aggregate([
+ {$match: {v: {$gt: 2, $lt: 3}}},
+ {$project: {hello: "$universe"}},
+]);
+coll.aggregate([
+ {$match: {v: {$gt: 0, $lt: 2}}},
+ {$project: {hello: "$galaxy"}},
+]);
+
+{
+ const telemetry = getTelemetry(mongos.getDB("test"));
+ assert.eq(3, telemetry.length); // The $telemetry query for the last test is included in this
+ // call to $telemetry.
+ const {key, metrics} = telemetry[1];
+ const {docsReturned, execCount} = metrics;
+ assert.eq(4, execCount);
+ assert.eq({
+ sum: NumberLong(5),
+ max: NumberLong(2),
+ min: NumberLong(0),
+ sumOfSquares: NumberLong(9),
+ },
+ docsReturned);
+ assert.eq({
+ pipeline: [
+ {$match: {v: {$gt: R, $lt: R}}},
+ {$project: {hello: R}},
+ ],
+ namespace: "test.coll",
+ applicationName: "MongoDB Shell"
+ },
+ key);
+ assertTelemetryMetricsSet(metrics);
+}
+
+st.stop();
+}());
diff --git a/src/mongo/s/commands/cluster_find_cmd.h b/src/mongo/s/commands/cluster_find_cmd.h
index 360b2e6ce1b..52b1149e0fe 100644
--- a/src/mongo/s/commands/cluster_find_cmd.h
+++ b/src/mongo/s/commands/cluster_find_cmd.h
@@ -38,6 +38,7 @@
#include "mongo/db/fle_crud.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/telemetry.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -224,6 +225,10 @@ public:
ExtensionsCallbackNoop(),
MatchExpressionParser::kAllowAllSpecialFeatures));
+ if (!_didDoFLERewrite) {
+ telemetry::registerFindRequest(cq->getFindCommandRequest(), cq->nss(), opCtx);
+ }
+
try {
// Do the work to generate the first batch of results. This blocks waiting to get
// responses from the shard(s).
@@ -245,6 +250,7 @@ public:
}
firstBatch.setPartialResultsReturned(partialResultsReturned);
firstBatch.done(cursorId, cq->nss());
+ telemetry::recordExecution(opCtx, _didDoFLERewrite);
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
result->reset();
@@ -277,7 +283,7 @@ public:
* were supplied with the command, and sets the constant runtime values that will be
* forwarded to each shard.
*/
- static std::unique_ptr<FindCommandRequest> _parseCmdObjectToFindCommandRequest(
+ std::unique_ptr<FindCommandRequest> _parseCmdObjectToFindCommandRequest(
OperationContext* opCtx, NamespaceString nss, BSONObj cmdObj) {
auto findCommand = query_request_helper::makeFromFindCommand(
std::move(cmdObj),
@@ -301,6 +307,7 @@ public:
invariant(findCommand->getNamespaceOrUUID().nss());
processFLEFindS(
opCtx, findCommand->getNamespaceOrUUID().nss().get(), findCommand.get());
+ _didDoFLERewrite = true;
}
return findCommand;
@@ -308,6 +315,7 @@ public:
const OpMsgRequest& _request;
const DatabaseName _dbName;
+ bool _didDoFLERewrite{false};
};
};
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 5550b177258..5537e4effb3 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/query/explain_common.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/fle/server_rewrite.h"
+#include "mongo/db/query/telemetry.h"
#include "mongo/db/timeseries/timeseries_gen.h"
#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/db/views/resolved_view.h"
@@ -331,6 +332,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto shouldDoFLERewrite = ::mongo::shouldDoFLERewrite(request);
auto startsWithDocuments = liteParsedPipeline.startsWithDocuments();
+ if (!shouldDoFLERewrite) {
+ telemetry::registerAggRequest(request, opCtx);
+ }
+
// If the routing table is not already taken by the higher level, fill it now.
if (!cm) {
// If the routing table is valid, we obtain a reference to it. If the table is not valid,
@@ -553,6 +558,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
updateHostsTargetedMetrics(opCtx, namespaces.executionNss, cm, involvedNamespaces);
// Report usage statistics for each stage in the pipeline.
liteParsedPipeline.tickGlobalStageCounters();
+ telemetry::recordExecution(opCtx, shouldDoFLERewrite);
// Add 'command' object to explain output.
if (expCtx->explain) {
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 47f11cf4e29..2709b80f648 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_command_gen.h"
#include "mongo/db/query/query_planner_common.h"
+#include "mongo/db/query/telemetry.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/overflow_arithmetic.h"
@@ -849,6 +850,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitWithPinnedCursorDuringGetMoreBatch");
}
+ telemetry::registerGetMoreRequest(opCtx);
+
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
StatusWith<ClusterQueryResult> next =
Status{ErrorCodes::InternalError, "uninitialized cluster query result"};