summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Buerger <will.buerger@mongodb.com>2023-03-01 17:27:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-01 21:26:19 +0000
commit12f2e3d221ac01156db60f307d433b56340115c3 (patch)
tree78cefa99ff44c7215c04d0a41777dcc7e90486a5
parentc80ca4373ac767037563691c9dd69f57258f2756 (diff)
downloadmongo-12f2e3d221ac01156db60f307d433b56340115c3.tar.gz
SERVER-73727: Aggregate mongos metrics in ClusterClientCursor
-rw-r--r--jstests/noPassthrough/telemetry_collect_on_mongos.js330
-rw-r--r--jstests/telemetry/telemetry_metrics_across_getMore_calls.js33
-rw-r--r--src/mongo/db/clientcursor.cpp65
-rw-r--r--src/mongo/db/clientcursor.h17
-rw-r--r--src/mongo/db/commands/find_cmd.cpp4
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp7
-rw-r--r--src/mongo/db/curop.cpp3
-rw-r--r--src/mongo/db/query/telemetry.cpp118
-rw-r--r--src/mongo/db/query/telemetry.h35
-rw-r--r--src/mongo/db/query/telemetry_store_test.cpp8
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.h1
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp25
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h14
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp20
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h9
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp33
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h9
-rw-r--r--src/mongo/s/query/cluster_find.cpp14
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp2
21 files changed, 403 insertions, 348 deletions
diff --git a/jstests/noPassthrough/telemetry_collect_on_mongos.js b/jstests/noPassthrough/telemetry_collect_on_mongos.js
index b54a42f747e..2eee0e214d0 100644
--- a/jstests/noPassthrough/telemetry_collect_on_mongos.js
+++ b/jstests/noPassthrough/telemetry_collect_on_mongos.js
@@ -5,27 +5,29 @@
(function() {
"use strict";
-const st = new ShardingTest({
- mongos: 1,
- shards: 1,
- config: 1,
- rs: {nodes: 1},
- mongosOptions: {
- setParameter: {
- internalQueryConfigureTelemetrySamplingRate: 2147483647,
- featureFlagTelemetry: true,
- }
- },
-});
-
// Redacted literal replacement string. This may change in the future, so it's factored out.
const R = "###";
-const mongos = st.s;
-const db = mongos.getDB("test");
-const coll = db.coll;
-coll.insert({v: 1});
-coll.insert({v: 4});
+const setup = () => {
+ const st = new ShardingTest({
+ mongos: 1,
+ shards: 1,
+ config: 1,
+ rs: {nodes: 1},
+ mongosOptions: {
+ setParameter: {
+ internalQueryConfigureTelemetrySamplingRate: 2147483647,
+ featureFlagTelemetry: true,
+ }
+ },
+ });
+ const mongos = st.s;
+ const db = mongos.getDB("test");
+ const coll = db.coll;
+ coll.insert({v: 1});
+ coll.insert({v: 4});
+ return st;
+};
// Get the telemetry for a given database, filtering out the actual $telemetry call.
const getTelemetry = (conn) => {
@@ -41,18 +43,26 @@ const getTelemetry = (conn) => {
return result.cursor.firstBatch;
};
-/**
- * Verify that mongos is recording these metrics:
- * - "firstSeenTimestamp"
- * - "lastExecutionMicros"
- * - "execCount"
- * - "queryExecMicros"
- * - "docsReturned"
- */
+const assertExpectedResults = (results,
+ expectedTelemetryKey,
+ expectedExecCount,
+ expectedDocsReturnedSum,
+ expectedDocsReturnedMax,
+ expectedDocsReturnedMin,
+ expectedDocsReturnedSumOfSq) => {
+ const {key, metrics} = results;
+ assert.eq(expectedTelemetryKey, key);
+ assert.eq(expectedExecCount, metrics.execCount);
+ assert.docEq({
+ sum: NumberLong(expectedDocsReturnedSum),
+ max: NumberLong(expectedDocsReturnedMax),
+ min: NumberLong(expectedDocsReturnedMin),
+ sumOfSquares: NumberLong(expectedDocsReturnedSumOfSq)
+ },
+ metrics.docsReturned);
-// This test can't predict exact timings, so just assert these three fields have been set (are
-// non-zero).
-const assertTelemetryMetricsSet = (metrics) => {
+ // This test can't predict exact timings, so just assert these three fields have been set (are
+ // non-zero).
const {firstSeenTimestamp, lastExecutionMicros, queryExecMicros} = metrics;
assert.neq(timestampCmp(firstSeenTimestamp, Timestamp(0, 0)), 0);
@@ -64,82 +74,214 @@ const assertTelemetryMetricsSet = (metrics) => {
}
};
-coll.find({v: {$gt: 0, $lt: 5}}).toArray();
-coll.find({v: {$gt: 2, $lt: 3}}).toArray();
-coll.find({v: {$gt: 0, $lt: 1}}).toArray();
-coll.find({v: {$gt: 0, $lt: 2}}).toArray();
-
+// Assert that, for find queries, no telemetry results are written until a cursor has reached
+// exhaustion; ensure accurate results once they're written.
{
- const telemetry = getTelemetry(db);
- assert.eq(1, telemetry.length);
- const {key, metrics} = telemetry[0];
- const {docsReturned, execCount} = metrics;
- assert.eq(4, execCount);
- assert.eq(
- {
- find: {
- find: R,
- filter: {v: {$gt: R, $lt: R}},
- readConcern: {level: R, provenance: R},
- },
- namespace: `test.${coll.getName()}`,
- readConcern: {level: "local", provenance: "implicitDefault"},
- applicationName: "MongoDB Shell"
+ const st = setup();
+ const db = st.s.getDB("test");
+ const coll = db.coll;
+
+ const telemetryKey = {
+ find: {
+ find: R,
+ filter: {v: {$gt: R, $lt: R}},
+ batchSize: R,
+ readConcern: {level: R, provenance: R},
},
- key,
- );
- assert.eq({
- sum: NumberLong(3),
- max: NumberLong(2),
- min: NumberLong(0),
- sumOfSquares: NumberLong(5),
- },
- docsReturned);
- assertTelemetryMetricsSet(metrics);
-}
+ namespace: `test.${coll.getName()}`,
+ readConcern: {level: "local", provenance: "implicitDefault"},
+ applicationName: "MongoDB Shell"
+ };
+
+ const cursor = coll.find({v: {$gt: 0, $lt: 5}}).batchSize(1); // returns 1 doc
+
+ // Since the cursor hasn't been exhausted yet, ensure no telemetry results have been written
+ // yet.
+ let telemetry = getTelemetry(db);
+ assert.eq(0, telemetry.length);
-coll.aggregate([
- {$match: {v: {$gt: 0, $lt: 5}}},
- {$project: {hello: "$world"}},
-]);
-coll.aggregate([
- {$match: {v: {$gt: 0, $lt: 5}}},
- {$project: {hello: "$world"}},
-]);
-coll.aggregate([
- {$match: {v: {$gt: 2, $lt: 3}}},
- {$project: {hello: "$universe"}},
-]);
-coll.aggregate([
- {$match: {v: {$gt: 0, $lt: 2}}},
- {$project: {hello: "$galaxy"}},
-]);
+ // Run a getMore to exhaust the cursor, then ensure telemetry results have been written
+ // accurately. batchSize must be 2 so the cursor recognizes exhaustion.
+ assert.commandWorked(db.runCommand({
+ getMore: cursor.getId(),
+ collection: coll.getName(),
+ batchSize: 2
+ })); // returns 1 doc, exhausts the cursor
+ // The $telemetry query for the previous `getTelemetry` is included in this call to $telemetry.
+ telemetry = getTelemetry(db);
+ assert.eq(2, telemetry.length);
+ assertExpectedResults(telemetry[0],
+ telemetryKey,
+ /* expectedExecCount */ 1,
+ /* expectedDocsReturnedSum */ 2,
+ /* expectedDocsReturnedMax */ 2,
+ /* expectedDocsReturnedMin */ 2,
+ /* expectedDocsReturnedSumOfSq */ 4);
+
+ // Run more queries (to exhaustion) with the same query shape, and ensure telemetry results are
+ // accurate.
+ coll.find({v: {$gt: 2, $lt: 3}}).batchSize(10).toArray(); // returns 0 docs
+ coll.find({v: {$gt: 0, $lt: 1}}).batchSize(10).toArray(); // returns 0 docs
+ coll.find({v: {$gt: 0, $lt: 2}}).batchSize(10).toArray(); // return 1 doc
+ telemetry = getTelemetry(db);
+ assert.eq(2, telemetry.length);
+ assertExpectedResults(telemetry[0],
+ telemetryKey,
+ /* expectedExecCount */ 4,
+ /* expectedDocsReturnedSum */ 3,
+ /* expectedDocsReturnedMax */ 2,
+ /* expectedDocsReturnedMin */ 0,
+ /* expectedDocsReturnedSumOfSq */ 5);
+
+ st.stop();
+}
+// Assert that, for agg queries, no telemetry results are written until a cursor has reached
+// exhaustion; ensure accurate results once they're written.
{
- const telemetry = getTelemetry(mongos.getDB("test"));
- assert.eq(3, telemetry.length); // The $telemetry query for the last test is included in this
- // call to $telemetry.
- const {key, metrics} = telemetry[1];
- const {docsReturned, execCount} = metrics;
- assert.eq(4, execCount);
- assert.eq({
- sum: NumberLong(5),
- max: NumberLong(2),
- min: NumberLong(0),
- sumOfSquares: NumberLong(9),
- },
- docsReturned);
- assert.eq({
+ const st = setup();
+ const db = st.s.getDB("test");
+ const coll = db.coll;
+
+ const telemetryKey = {
pipeline: [
{$match: {v: {$gt: R, $lt: R}}},
{$project: {hello: R}},
],
namespace: "test.coll",
applicationName: "MongoDB Shell"
- },
- key);
- assertTelemetryMetricsSet(metrics);
+ };
+
+ const cursor = coll.aggregate(
+ [
+ {$match: {v: {$gt: 0, $lt: 5}}},
+ {$project: {hello: "$world"}},
+ ],
+ {cursor: {batchSize: 1}}); // returns 1 doc
+
+ // Since the cursor hasn't been exhausted yet, ensure no telemetry results have been written
+ // yet.
+ let telemetry = getTelemetry(db);
+ assert.eq(0, telemetry.length);
+
+ // Run a getMore to exhaust the cursor, then ensure telemetry results have been written
+ // accurately. batchSize must be 2 so the cursor recognizes exhaustion.
+ assert.commandWorked(db.runCommand({
+ getMore: cursor.getId(),
+ collection: coll.getName(),
+ batchSize: 2
+ })); // returns 1 doc, exhausts the cursor
+ // The $telemetry query for the previous `getTelemetry` is included in this call to $telemetry.
+ telemetry = getTelemetry(db);
+ assert.eq(2, telemetry.length);
+ assertExpectedResults(telemetry[0],
+ telemetryKey,
+ /* expectedExecCount */ 1,
+ /* expectedDocsReturnedSum */ 2,
+ /* expectedDocsReturnedMax */ 2,
+ /* expectedDocsReturnedMin */ 2,
+ /* expectedDocsReturnedSumOfSq */ 4);
+
+ // Run more queries (to exhaustion) with the same query shape, and ensure telemetry results are
+ // accurate.
+ coll.aggregate([
+ {$match: {v: {$gt: 0, $lt: 5}}},
+ {$project: {hello: "$world"}},
+ ]); // returns 2 docs
+ coll.aggregate([
+ {$match: {v: {$gt: 2, $lt: 3}}},
+ {$project: {hello: "$universe"}},
+ ]); // returns 0 docs
+ coll.aggregate([
+ {$match: {v: {$gt: 0, $lt: 2}}},
+ {$project: {hello: "$galaxy"}},
+ ]); // returns 1 doc
+ telemetry = getTelemetry(db);
+ assert.eq(2, telemetry.length);
+ assertExpectedResults(telemetry[0],
+ telemetryKey,
+ /* expectedExecCount */ 4,
+ /* expectedDocsReturnedSum */ 5,
+ /* expectedDocsReturnedMax */ 2,
+ /* expectedDocsReturnedMin */ 0,
+ /* expectedDocsReturnedSumOfSq */ 9);
+
+ st.stop();
}
-st.stop();
+// Assert on batchSize-limited find queries that killCursors will write metrics with partial results
+// to the telemetry store.
+{
+ const st = setup();
+ const db = st.s.getDB("test");
+ const coll = db.coll;
+
+ const telemetryKey = {
+ find: {
+ find: R,
+ filter: {v: {$gt: R, $lt: R}},
+ batchSize: R,
+ readConcern: {level: R, provenance: R},
+ },
+ namespace: `test.${coll.getName()}`,
+ readConcern: {level: "local", provenance: "implicitDefault"},
+ applicationName: "MongoDB Shell"
+ };
+
+ const cursor1 = coll.find({v: {$gt: 0, $lt: 5}}).batchSize(1); // returns 1 doc
+ const cursor2 = coll.find({v: {$gt: 0, $lt: 2}}).batchSize(1); // returns 1 doc
+
+ assert.commandWorked(
+ db.runCommand({killCursors: coll.getName(), cursors: [cursor1.getId(), cursor2.getId()]}));
+
+ const telemetry = getTelemetry(db);
+ assert.eq(1, telemetry.length);
+ assertExpectedResults(telemetry[0],
+ telemetryKey,
+ /* expectedExecCount */ 2,
+ /* expectedDocsReturnedSum */ 2,
+ /* expectedDocsReturnedMax */ 1,
+ /* expectedDocsReturnedMin */ 1,
+ /* expectedDocsReturnedSumOfSq */ 2);
+ st.stop();
+}
+
+// Assert on batchSize-limited agg queries that killCursors will write metrics with partial results
+// to the telemetry store.
+{
+ const st = setup();
+ const db = st.s.getDB("test");
+ const coll = db.coll;
+
+ const telemetryKey = {
+ pipeline: [{$match: {v: {$gt: R, $lt: R}}}],
+ namespace: `test.${coll.getName()}`,
+ applicationName: "MongoDB Shell"
+ };
+
+ const cursor1 = coll.aggregate(
+ [
+ {$match: {v: {$gt: 0, $lt: 5}}},
+ ],
+ {cursor: {batchSize: 1}}); // returns 1 doc
+ const cursor2 = coll.aggregate(
+ [
+ {$match: {v: {$gt: 0, $lt: 2}}},
+ ],
+ {cursor: {batchSize: 1}}); // returns 1 doc
+
+ assert.commandWorked(
+ db.runCommand({killCursors: coll.getName(), cursors: [cursor1.getId(), cursor2.getId()]}));
+
+ const telemetry = getTelemetry(db);
+ assert.eq(1, telemetry.length);
+ assertExpectedResults(telemetry[0],
+ telemetryKey,
+ /* expectedExecCount */ 2,
+ /* expectedDocsReturnedSum */ 2,
+ /* expectedDocsReturnedMax */ 1,
+ /* expectedDocsReturnedMin */ 1,
+ /* expectedDocsReturnedSumOfSq */ 2);
+ st.stop();
+}
}());
diff --git a/jstests/telemetry/telemetry_metrics_across_getMore_calls.js b/jstests/telemetry/telemetry_metrics_across_getMore_calls.js
index 2a411fe808f..89ed9a9156c 100644
--- a/jstests/telemetry/telemetry_metrics_across_getMore_calls.js
+++ b/jstests/telemetry/telemetry_metrics_across_getMore_calls.js
@@ -50,15 +50,9 @@ assert.commandWorked(bulk.execute());
assert.eq(telemetryEntry.metrics.execCount, 2);
// Assert telemetry values are accurate for the two above queries.
- assert.eq(telemetryEntry.metrics.docsScanned.sum, 2 * numDocs);
- assert.eq(telemetryEntry.metrics.docsScanned.min, numDocs);
- assert.eq(telemetryEntry.metrics.docsScanned.max, numDocs);
assert.eq(telemetryEntry.metrics.docsReturned.sum, numDocs);
assert.eq(telemetryEntry.metrics.docsReturned.min, numDocs / 2);
assert.eq(telemetryEntry.metrics.docsReturned.max, numDocs / 2);
- assert.eq(telemetryEntry.metrics.keysScanned.sum, 0);
- assert.eq(telemetryEntry.metrics.keysScanned.min, 0);
- assert.eq(telemetryEntry.metrics.keysScanned.max, 0);
verifyMetrics(telemetryResults);
}
@@ -70,7 +64,7 @@ const fooNeBatchSize = 3;
{
let cursor1 = coll.find({foo: {$eq: 0}}).batchSize(fooEqBatchSize);
let cursor2 = coll.find({foo: {$ne: 0}}).batchSize(fooNeBatchSize);
- // Issue one getMore for the first query, so 2 * cursor1BatchSize documents are returned total.
+ // Issue one getMore for the first query, so 2 * fooEqBatchSize documents are returned total.
assert.commandWorked(testDB.runCommand(
{getMore: cursor1.getId(), collection: coll.getName(), batchSize: fooEqBatchSize}));
@@ -112,11 +106,6 @@ const fooNeBatchSize = 3;
.toArray();
assert.eq(telemetryResults.length, 4, telemetryResults);
- // Assert no keys scanned when no index exists.
- assert.eq(telemetryResults[0].metrics.keysScanned.sum, 0);
- assert.eq(telemetryResults[1].metrics.keysScanned.sum, 0);
- assert.eq(telemetryResults[2].metrics.keysScanned.sum, 0);
- assert.eq(telemetryResults[3].metrics.keysScanned.sum, 0);
verifyMetrics(telemetryResults);
// This filters to just the telemetry for query coll.find().sort({"foo": 1}).batchSize(2).
@@ -164,25 +153,5 @@ const fooNeBatchSize = 3;
assert.eq(telemetryResults[0].metrics.docsReturned.min, 2 * fooEqBatchSize);
}
-// Ensure that for queries using an index, keys scanned is nonzero.
-{
- assert.commandWorked(coll.createIndex({bar: 1}));
- coll.aggregate([{$match: {$or: [{bar: 1, foo: 1}]}}], {cursor: {batchSize: 2}});
-
- // This filters telemetry entries to just the one entered for the above agg command.
- const telemetryResults =
- testDB.getSiblingDB("admin")
- .aggregate([
- {$telemetry: {}},
- {$match: {"key.pipeline.$match.$or": {$eq: [{'bar': '###', 'foo': '###'}]}}}
- ])
- .toArray();
- assert.eq(telemetryResults.length, 1, telemetryResults);
- assert.eq(telemetryResults[0].key.namespace, `test.${jsTestName()}`);
- assert.eq(telemetryResults[0].key.applicationName, "MongoDB Shell");
- assert.gt(telemetryResults[0].metrics.keysScanned.sum, 0);
- verifyMetrics(telemetryResults);
-}
-
MongoRunner.stopMongod(conn);
}());
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 6c74f6be1ec..0fd963220a3 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -157,13 +157,7 @@ void ClientCursor::dispose(OperationContext* opCtx, boost::optional<Date_t> now)
}
if (_telemetryStoreKey && opCtx) {
- telemetry::writeTelemetry(opCtx,
- _telemetryStoreKey,
- _queryOptMicros,
- _queryExecMicros,
- _docsReturned,
- _metrics.docsExamined.value_or(0),
- _metrics.keysExamined.value_or(0));
+ telemetry::writeTelemetry(opCtx, _telemetryStoreKey, _queryExecMicros, _docsReturned);
}
if (now) {
@@ -386,44 +380,27 @@ void startClientCursorMonitor() {
getClientCursorMonitor(getGlobalServiceContext()).go();
}
-void collectTelemetry(OperationContext* opCtx,
- boost::optional<ClientCursorPin&> pinnedCursor,
- bool isGetMoreOp) {
+void collectTelemetryMongod(OperationContext* opCtx, ClientCursorPin& pinnedCursor) {
auto&& opDebug = CurOp::get(opCtx)->debug();
- if (pinnedCursor) {
- auto cursor = pinnedCursor->getCursor();
- // TODO SERVER-73727 setting shouldRecordTelemetry to boost::none is only
- // temporarily necessary to avoid collecting metrics a second time in
- // CurOp::completeAndLogOperation
- opDebug.telemetryStoreKey = boost::none;
-
- // We have to use `elapsedTimeExcludingPauses` to count execution time since
- // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
- cursor->incCursorMetrics(opDebug.additiveMetrics,
- CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
- opDebug.nreturned);
-
- // If the current operation is a getMore, planning time was already aggregated on the
- // initial operation.
- if (!isGetMoreOp) {
- cursor->setQueryOptMicros(opDebug.planningTime.count());
- }
- } else {
- tassert(7301701, "getMore operations should aggregate metrics on the cursor", !isGetMoreOp);
- auto&& opDebug = CurOp::get(opCtx)->debug();
- // If we haven't registered a cursor to prepare for getMore requests, we record
- // telemetry directly.
- //
- // We have to use `elapsedTimeExcludingPauses` to count execution time since
- // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
- telemetry::writeTelemetry(opCtx,
- opDebug.telemetryStoreKey,
- opDebug.planningTime.count(),
- CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
- opDebug.nreturned,
- opDebug.additiveMetrics.docsExamined.value_or(0),
- opDebug.additiveMetrics.keysExamined.value_or(0));
- }
+
+ // We have to use `elapsedTimeExcludingPauses` to count execution time since
+ // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
+ pinnedCursor->incrementCursorMetrics(opDebug.additiveMetrics,
+ CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
+ opDebug.nreturned);
+}
+
+void collectTelemetryMongod(OperationContext* opCtx) {
+ auto&& opDebug = CurOp::get(opCtx)->debug();
+ // If we haven't registered a cursor to prepare for getMore requests, we record
+ // telemetry directly.
+ //
+ // We have to use `elapsedTimeExcludingPauses` to count execution time since
+ // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
+ telemetry::writeTelemetry(opCtx,
+ opDebug.telemetryStoreKey,
+ CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
+ opDebug.nreturned);
}
} // namespace mongo
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index a9bd095c2ce..11573fde1df 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -215,14 +215,9 @@ public:
_nReturnedSoFar = n;
}
- void setQueryOptMicros(uint64_t queryOptMicros) {
- tassert(7301700, "queryOptMicros should only be set once per cursor", _queryOptMicros == 0);
- _queryOptMicros = queryOptMicros;
- }
-
- void incCursorMetrics(OpDebug::AdditiveMetrics newMetrics,
- uint64_t newExecutionTime,
- uint64_t newDocsReturned) {
+ void incrementCursorMetrics(OpDebug::AdditiveMetrics newMetrics,
+ uint64_t newExecutionTime,
+ uint64_t newDocsReturned) {
_metrics.add(newMetrics);
_queryExecMicros += newExecutionTime;
_docsReturned += newDocsReturned;
@@ -467,7 +462,6 @@ private:
// If boost::none, telemetry should not be collected for this cursor.
boost::optional<BSONObj> _telemetryStoreKey;
// Metrics used for telemetry. TODO SERVER-73933 consider merging more into '_metrics'
- uint64_t _queryOptMicros = 0;
uint64_t _queryExecMicros = 0;
uint64_t _docsReturned = 0;
OpDebug::AdditiveMetrics _metrics;
@@ -604,8 +598,7 @@ void startClientCursorMonitor();
* provided, metrics are aggregated on the cursor; otherwise, metrics are written directly to the
* telemetry store.
*/
-void collectTelemetry(OperationContext* opCtx,
- boost::optional<ClientCursorPin&> cursor,
- bool isGetMoreOp);
+void collectTelemetryMongod(OperationContext* opCtx, ClientCursorPin& cursor);
+void collectTelemetryMongod(OperationContext* opCtx);
} // namespace mongo
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 7d42b2ee70e..cf39d747f87 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -726,10 +726,10 @@ public:
CurOp::get(opCtx)->debug().storageStats =
opCtx->recoveryUnit()->computeOperationStatisticsSinceLastCall();
}
- collectTelemetry(opCtx, pinnedCursor, false);
+ collectTelemetryMongod(opCtx, pinnedCursor);
} else {
endQueryOp(opCtx, collection, *exec, numResults, cursorId);
- collectTelemetry(opCtx, boost::none, false);
+ collectTelemetryMongod(opCtx);
}
// Generate the response object to send to the client.
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 46fb19627ea..86f324150e9 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -695,7 +695,7 @@ public:
// response batch.
curOp->debug().nreturned = numResults;
- collectTelemetry(opCtx, cursorPin, true);
+ collectTelemetryMongod(opCtx, cursorPin);
if (respondWithId) {
cursorDeleter.dismiss();
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index d21f9801319..3c3e0240584 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -1180,8 +1180,11 @@ Status runAggregate(OperationContext* opCtx,
curOp->debug().setPlanSummaryMetrics(stats);
curOp->debug().nreturned = stats.nReturned;
- boost::optional<ClientCursorPin&> cursorForTelemetry = pins[0];
- collectTelemetry(opCtx, keepCursor ? cursorForTelemetry : boost::none, false);
+ if (keepCursor) {
+ collectTelemetryMongod(opCtx, pins[0]);
+ } else {
+ collectTelemetryMongod(opCtx);
+ }
// For an optimized away pipeline, signal the cache that a query operation has completed.
// For normal pipelines this is done in DocumentSourceCursor.
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index 7581f999cb2..e868ba135dc 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -445,9 +445,6 @@ bool CurOp::completeAndLogOperation(logv2::LogComponent component,
_debug.executionTime = duration_cast<Microseconds>(elapsedTimeExcludingPauses());
const auto executionTimeMillis = durationCount<Milliseconds>(_debug.executionTime);
- // TODO SERVER-73727 remove telemetry collection here once metrics are aggregated in cursor
- telemetry::collectTelemetry(opCtx, CurOp::get(opCtx)->debug());
-
if (_debug.isReplOplogGetMore) {
oplogGetMoreStats.recordMillis(executionTimeMillis);
}
diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp
index a979f10da8d..f59a0c1e692 100644
--- a/src/mongo/db/query/telemetry.cpp
+++ b/src/mongo/db/query/telemetry.cpp
@@ -505,50 +505,6 @@ void throwIfEncounteringFLEPayload(const BSONElement& e) {
}
/**
- * Get the metrics for a given key holding the appropriate locks.
- */
-class LockedMetrics {
- LockedMetrics(TelemetryMetrics* metrics,
- TelemetryStore& telemetryStore,
- TelemetryStore::Partition partitionLock)
- : _metrics(metrics),
- _telemetryStore(telemetryStore),
- _partitionLock(std::move(partitionLock)) {}
-
-public:
- static LockedMetrics get(OperationContext* opCtx, const BSONObj& telemetryKey) {
- auto&& telemetryStore = getTelemetryStore(opCtx);
- auto&& [statusWithMetrics, partitionLock] =
- telemetryStore.getWithPartitionLock(telemetryKey);
- TelemetryMetrics* metrics;
- if (statusWithMetrics.isOK()) {
- metrics = statusWithMetrics.getValue();
- } else {
- size_t numEvicted = telemetryStore.put(telemetryKey, {}, partitionLock);
- telemetryEvictedMetric.increment(numEvicted);
- auto newMetrics = partitionLock->get(telemetryKey);
- // This can happen if the budget is immediately exceeded. Specifically if the there is
- // not enough room for a single new entry if the number of partitions is too high
- // relative to the size.
- tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
- metrics = &newMetrics.getValue()->second;
- }
- return LockedMetrics{metrics, telemetryStore, std::move(partitionLock)};
- }
-
- TelemetryMetrics* operator->() const {
- return _metrics;
- }
-
-private:
- TelemetryMetrics* _metrics;
-
- TelemetryStore& _telemetryStore;
-
- TelemetryStore::Partition _partitionLock;
-};
-
-/**
* Upon reading telemetry data, we redact some keys. This is the list. See
* TelemetryMetrics::redactKey().
*/
@@ -755,80 +711,38 @@ void registerFindRequest(const FindCommandRequest& request,
CurOp::get(opCtx)->debug().telemetryStoreKey = telemetryKey.obj();
}
-// TODO SERVER-73727 registerGetMoreRequest should be removed once metrics are aggregated on cursors
-// across getMores on mongos
-void registerGetMoreRequest(OperationContext* opCtx) {
- if (!isTelemetryEnabled()) {
- return;
- }
-
- // Rate limiting is important in all cases as it limits the amount of CPU telemetry uses to
- // prevent degrading query throughput. This is essential in the case of a large find
- // query with a batchsize of 1, where collecting metrics on every getMore would quickly impact
- // the number of queries the system can process per second (query throughput). This is why not
- // only are originating queries rate limited but also their subsequent getMore operations.
- if (!shouldCollect(opCtx->getServiceContext())) {
- return;
- }
-}
-
TelemetryStore& getTelemetryStore(OperationContext* opCtx) {
uassert(6579000, "Telemetry is not enabled without the feature flag on", isTelemetryEnabled());
return telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore();
}
-void recordExecution(OperationContext* opCtx, bool isFle) {
- if (!isTelemetryEnabled()) {
- return;
- }
- if (isFle) {
- return;
- }
- // Confirms that this is an operation the telemetry machinery has decided to collect metrics
- // from.
- auto&& opDebug = CurOp::get(opCtx)->debug();
- if (!opDebug.telemetryStoreKey) {
- return;
- }
- auto&& metrics = LockedMetrics::get(opCtx, *opDebug.telemetryStoreKey);
- metrics->execCount++;
- metrics->queryOptMicros.aggregate(opDebug.planningTime.count());
-}
-
-void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) {
- // Confirms that this is an operation the telemetry machinery has decided to collect metrics
- // from.
- if (!opDebug.telemetryStoreKey) {
- return;
- }
-
- auto&& metrics = LockedMetrics::get(opCtx, *opDebug.telemetryStoreKey);
- metrics->docsReturned.aggregate(opDebug.nreturned);
- metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0));
- metrics->keysScanned.aggregate(opDebug.additiveMetrics.keysExamined.value_or(0));
- metrics->lastExecutionMicros = opDebug.executionTime.count();
- metrics->queryExecMicros.aggregate(opDebug.executionTime.count());
-}
-
void writeTelemetry(OperationContext* opCtx,
boost::optional<BSONObj> telemetryKey,
- const uint64_t queryOptMicros,
const uint64_t queryExecMicros,
- const uint64_t docsReturned,
- const uint64_t docsScanned,
- const uint64_t keysScanned) {
+ const uint64_t docsReturned) {
if (!telemetryKey) {
return;
}
- auto&& metrics = LockedMetrics::get(opCtx, *telemetryKey);
+ auto&& telemetryStore = getTelemetryStore(opCtx);
+ auto&& [statusWithMetrics, partitionLock] = telemetryStore.getWithPartitionLock(*telemetryKey);
+ TelemetryMetrics* metrics;
+ if (statusWithMetrics.isOK()) {
+ metrics = statusWithMetrics.getValue();
+ } else {
+ size_t numEvicted = telemetryStore.put(*telemetryKey, {}, partitionLock);
+ telemetryEvictedMetric.increment(numEvicted);
+ auto newMetrics = partitionLock->get(*telemetryKey);
+ // This can happen if the budget is immediately exceeded. Specifically if the there is
+ // not enough room for a single new entry if the number of partitions is too high
+ // relative to the size.
+ tassert(7064700, "Should find telemetry store entry", newMetrics.isOK());
+ metrics = &newMetrics.getValue()->second;
+ }
metrics->lastExecutionMicros = queryExecMicros;
metrics->execCount++;
- metrics->queryOptMicros.aggregate(queryOptMicros);
metrics->queryExecMicros.aggregate(queryExecMicros);
metrics->docsReturned.aggregate(docsReturned);
- metrics->docsScanned.aggregate(docsScanned);
- metrics->keysScanned.aggregate(keysScanned);
}
} // namespace telemetry
} // namespace mongo
diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h
index 340d502247b..eee56143a1b 100644
--- a/src/mongo/db/query/telemetry.h
+++ b/src/mongo/db/query/telemetry.h
@@ -131,11 +131,8 @@ public:
BSONObjBuilder builder{sizeof(TelemetryMetrics) + 100};
builder.append("lastExecutionMicros", (BSONNumeric)lastExecutionMicros);
builder.append("execCount", (BSONNumeric)execCount);
- queryOptMicros.appendTo(builder, "queryOptMicros");
queryExecMicros.appendTo(builder, "queryExecMicros");
docsReturned.appendTo(builder, "docsReturned");
- docsScanned.appendTo(builder, "docsScanned");
- keysScanned.appendTo(builder, "keysScanned");
builder.append("firstSeenTimestamp", firstSeenTimestamp);
return builder.obj();
}
@@ -160,16 +157,10 @@ public:
*/
uint64_t execCount = 0;
- AggregatedMetric queryOptMicros;
-
AggregatedMetric queryExecMicros;
AggregatedMetric docsReturned;
- AggregatedMetric docsScanned;
-
- AggregatedMetric keysScanned;
-
private:
/**
* We cache the redacted key the first time it's computed.
@@ -208,8 +199,8 @@ TelemetryStore& getTelemetryStore(OperationContext* opCtx);
* collect anything but this should be called for all requests. The decision is made based on
* the feature flag and telemetry parameters such as rate limiting.
*
- * The caller is still responsible for subsequently calling collectTelemetry() once the request
- * is completed.
+ * The caller is still responsible for subsequently calling writeTelemetry() once the request is
+ * completed.
*
* Note that calling this affects internal state. It should be called once for each request for
* which telemetry may be collected.
@@ -220,32 +211,12 @@ void registerFindRequest(const FindCommandRequest& request,
const NamespaceString& collection,
OperationContext* ocCtx);
-void registerGetMoreRequest(OperationContext* opCtx);
-
-// recordExecution is called between registering the query and collecting metrics post execution.
-// Its purpose is to track the number of times a given query shape has been ran. The execution count
-// is incremented outside of registering the command because the originating command could be an
-// explain request and therefore the query is not actually executed.
-// TODO SERVER-73727 remove this function
-void recordExecution(OperationContext* opCtx, bool isFle);
-
-/**
- * Collect telemetry for the operation identified by the telemetryKey stored on
- * opDebug.
- * TODO SERVER-73727 remove this function
- */
-void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug);
-
-
/**
* Writes telemetry to the telemetry store for the operation identified by `telemetryKey`.
*/
void writeTelemetry(OperationContext* opCtx,
boost::optional<BSONObj> telemetryKey,
- uint64_t queryOptMicros,
uint64_t queryExecMicros,
- uint64_t docsReturned,
- uint64_t docsScanned,
- uint64_t keysScanned);
+ uint64_t docsReturned);
} // namespace telemetry
} // namespace mongo
diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp
index 6e1762bbfdc..82c2aa8b57d 100644
--- a/src/mongo/db/query/telemetry_store_test.cpp
+++ b/src/mongo/db/query/telemetry_store_test.cpp
@@ -101,15 +101,15 @@ TEST_F(TelemetryStoreTest, EvictEntries) {
// This creates a telemetry store with 2 partitions, each with a size of 1200 bytes.
TelemetryStore telStore{2400, 2};
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 20; i++) {
auto query = BSON("query" + std::to_string(i) << 1 << "xEquals" << 42);
telStore.put(query, TelemetryMetrics{});
}
int numKeys = 0;
telStore.forEach([&](const BSONObj& key, const TelemetryMetrics& entry) { numKeys++; });
- // Given the size of the bson keys (~46 bytes) and values (~208 bytes), each partition (1200
- // bytes) can hold at most 4 entries.
- ASSERT_EQ(numKeys, 8);
+ // Given the size of the bson keys (~46 bytes) and values (~112 bytes), each partition (1200
+ // bytes) can hold at most 7 entries.
+ ASSERT_EQ(numKeys, 14);
}
/**
diff --git a/src/mongo/s/commands/cluster_find_cmd.h b/src/mongo/s/commands/cluster_find_cmd.h
index 52b1149e0fe..23b99fcea2d 100644
--- a/src/mongo/s/commands/cluster_find_cmd.h
+++ b/src/mongo/s/commands/cluster_find_cmd.h
@@ -250,7 +250,6 @@ public:
}
firstBatch.setPartialResultsReturned(partialResultsReturned);
firstBatch.done(cursorId, cq->nss());
- telemetry::recordExecution(opCtx, _didDoFLERewrite);
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
result->reset();
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 867e70d5a50..5955b8c9a75 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -565,8 +565,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
updateHostsTargetedMetrics(opCtx, namespaces.executionNss, cm, involvedNamespaces);
// Report usage statistics for each stage in the pipeline.
liteParsedPipeline.tickGlobalStageCounters();
- telemetry::recordExecution(opCtx, shouldDoFLERewrite);
-
// Add 'command' object to explain output.
if (expCtx->explain) {
explain_common::appendIfRoom(
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 621ed5af3f0..a16a783a14b 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -349,12 +349,24 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
}
+ bool exhausted = cursorState != ClusterCursorManager::CursorState::NotExhausted;
+ int nShards = ccc->getNumRemotes();
+
+ // Fill out the aggregation metrics in CurOp, and record telemetry metrics, before detaching the
+ // cursor from its opCtx.
+ CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards);
+ CurOp::get(opCtx)->debug().cursorExhausted = exhausted;
+ CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs();
+ if (exhausted) {
+ collectTelemetryMongos(opCtx);
+ } else {
+ collectTelemetryMongos(opCtx, ccc);
+ }
+
ccc->detachFromOperationContext();
- int nShards = ccc->getNumRemotes();
CursorId clusterCursorId = 0;
-
- if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
+ if (!exhausted) {
auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName();
clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
opCtx,
@@ -363,15 +375,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal,
authUser));
- }
-
- // Fill out the aggregation metrics in CurOp.
- if (clusterCursorId > 0) {
CurOp::get(opCtx)->debug().cursorid = clusterCursorId;
}
- CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards);
- CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0);
- CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs();
responseBuilder.done(clusterCursorId, requestedNss);
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 57c2fa7d100..a83b50f9287 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -221,6 +221,14 @@ public:
*/
virtual void incNBatches() = 0;
+ void incrementCursorMetrics(OpDebug::AdditiveMetrics newMetrics,
+ uint64_t newExecutionTime,
+ uint64_t newDocsReturned) {
+ _metrics.add(newMetrics);
+ _queryExecMicros += newExecutionTime;
+ _docsReturned += newDocsReturned;
+ }
+
//
// maxTimeMS support.
//
@@ -245,6 +253,12 @@ public:
_leftoverMaxTimeMicros = leftoverMaxTimeMicros;
}
+protected:
+ // Metrics used for telemetry. TODO SERVER-73933 consider merging more into '_metrics'
+ uint64_t _queryExecMicros = 0;
+ uint64_t _docsReturned = 0;
+ OpDebug::AdditiveMetrics _metrics;
+
private:
// Unused maxTime budget for this cursor.
Microseconds _leftoverMaxTimeMicros = Microseconds::max();
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 4f2a333c3be..9f7e32244cd 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -32,11 +32,15 @@
#include <memory>
#include "mongo/db/curop.h"
+#include "mongo/db/query/telemetry.h"
+#include "mongo/logv2/log.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
namespace mongo {
static CounterMetric mongosCursorStatsTotalOpened("mongos.cursor.totalOpened");
@@ -69,7 +73,8 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
_opCtx(opCtx),
_createdDate(opCtx->getServiceContext()->getPreciseClockSource()->now()),
_lastUseDate(_createdDate),
- _queryHash(CurOp::get(opCtx)->debug().queryHash) {
+ _queryHash(CurOp::get(opCtx)->debug().queryHash),
+ _telemetryStoreKey(CurOp::get(opCtx)->debug().telemetryStoreKey) {
dassert(!_params.compareWholeSortKeyOnRouter ||
SimpleBSONObjComparator::kInstance.evaluate(
_params.sortToApplyOnRouter == AsyncResultsMerger::kWholeSortKeySortPattern));
@@ -124,7 +129,20 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
}
void ClusterClientCursorImpl::kill(OperationContext* opCtx) {
+ if (_hasBeenKilled) {
+ LOGV2_DEBUG(7372700,
+ 3,
+ "Kill called on cluster client cursor after cursor has already been killed, so "
+ "ignoring");
+ return;
+ }
+
+ if (_telemetryStoreKey && opCtx) {
+ telemetry::writeTelemetry(opCtx, _telemetryStoreKey, _queryExecMicros, _docsReturned);
+ }
+
_root->kill(opCtx);
+ _hasBeenKilled = true;
}
void ClusterClientCursorImpl::reattachToOperationContext(OperationContext* opCtx) {
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 4020918761c..3b8dc7f38d5 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -180,6 +180,15 @@ private:
// Whether ClusterClientCursor::next() was interrupted due to MaxTimeMSExpired.
bool _maxTimeMSExpired = false;
+
+ // If boost::none, telemetry should not be collected for this cursor.
+ boost::optional<BSONObj> _telemetryStoreKey;
+
+ // Tracks if kill() has been called on the cursor. Multiple calls to kill() are treated as a
+ // noop.
+ // TODO SERVER-74482 investigate where kill() is called multiple times and remove unnecessary
+ // calls
+ bool _hasBeenKilled = false;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 2f5c3563342..38dbc2b0c56 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/allocate_cursor_id.h"
#include "mongo/db/curop.h"
#include "mongo/db/query/query_knobs_gen.h"
+#include "mongo/db/query/telemetry.h"
#include "mongo/db/session/kill_sessions_common.h"
#include "mongo/db/session/logical_session_cache.h"
#include "mongo/logv2/log.h"
@@ -586,4 +587,36 @@ StatusWith<ClusterClientCursorGuard> ClusterCursorManager::_detachCursor(WithLoc
return std::move(cursor);
}
+
+void collectTelemetryMongos(OperationContext* opCtx) {
+ auto&& opDebug = CurOp::get(opCtx)->debug();
+ // If we haven't registered a cursor to prepare for getMore requests, we record
+ // telemetry directly.
+ //
+ // We have to use `elapsedTimeExcludingPauses` to count execution time since
+ // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
+ telemetry::writeTelemetry(opCtx,
+ opDebug.telemetryStoreKey,
+ CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
+ opDebug.nreturned);
+}
+
+void collectTelemetryMongos(OperationContext* opCtx, ClusterClientCursorGuard& cursor) {
+ auto&& opDebug = CurOp::get(opCtx)->debug();
+ // We have to use `elapsedTimeExcludingPauses` to count execution time since
+ // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
+ cursor->incrementCursorMetrics(opDebug.additiveMetrics,
+ CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
+ opDebug.nreturned);
+}
+
+void collectTelemetryMongos(OperationContext* opCtx, ClusterCursorManager::PinnedCursor& cursor) {
+ auto&& opDebug = CurOp::get(opCtx)->debug();
+ // We have to use `elapsedTimeExcludingPauses` to count execution time since
+ // additiveMetrics.queryExecMicros isn't set until curOp is closing out.
+ cursor->incrementCursorMetrics(opDebug.additiveMetrics,
+ CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(),
+ opDebug.nreturned);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index a7d8c21cb7a..a3acf70d478 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -593,4 +593,13 @@ private:
size_t _cursorsTimedOut = 0;
};
+/**
+ * Aggregates telemetry for the current operation via metrics stored on opDebug. If a cursor is
+ * provided (via ClusterClientCursorGuard or ClusterCursorManager::PinnedCursor), metrics are
+ * aggregated on the cursor; otherwise, metrics are written directly to the telemetry store.
+ */
+void collectTelemetryMongos(OperationContext* opCtx);
+void collectTelemetryMongos(OperationContext* opCtx, ClusterClientCursorGuard& cursor);
+void collectTelemetryMongos(OperationContext* opCtx, ClusterCursorManager::PinnedCursor& cursor);
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 4ef7f79b3f9..85c2a7a4f37 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -459,6 +459,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
if (shardIds.size() > 0) {
updateNumHostsTargetedMetrics(opCtx, cm, shardIds.size());
}
+ collectTelemetryMongos(opCtx);
return CursorId(0);
}
@@ -470,6 +471,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
: ClusterCursorManager::CursorLifetime::Mortal;
auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName();
ccc->incNBatches();
+ collectTelemetryMongos(opCtx, ccc);
auto cursorId = uassertStatusOK(cursorManager->registerCursor(
opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime, authUser));
@@ -850,8 +852,6 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitWithPinnedCursorDuringGetMoreBatch");
}
- telemetry::registerGetMoreRequest(opCtx);
-
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
StatusWith<ClusterQueryResult> next =
Status{ErrorCodes::InternalError, "uninitialized cluster query result"};
@@ -931,17 +931,19 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
postBatchResumeToken = pinnedCursor.getValue()->getPostBatchResumeToken();
}
+ // Set nReturned and whether the cursor has been exhausted.
+ CurOp::get(opCtx)->debug().cursorExhausted = (idToReturn == 0);
+ CurOp::get(opCtx)->debug().nreturned = batch.size();
+
const bool partialResultsReturned = pinnedCursor.getValue()->partialResultsReturned();
pinnedCursor.getValue()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
pinnedCursor.getValue()->incNBatches();
+ collectTelemetryMongos(opCtx, pinnedCursor.getValue());
+
// Upon successful completion, transfer ownership of the cursor back to the cursor manager. If
// the cursor has been exhausted, the cursor manager will clean it up for us.
pinnedCursor.getValue().returnCursor(cursorState);
- // Set nReturned and whether the cursor has been exhausted.
- CurOp::get(opCtx)->debug().cursorExhausted = (idToReturn == 0);
- CurOp::get(opCtx)->debug().nreturned = batch.size();
-
if (MONGO_unlikely(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch.shouldFail())) {
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch,
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index fcd1a89dd3d..ab36d7bdad2 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -97,6 +97,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
if (incomingCursorResponse.getValue().getCursorId() == CursorId(0)) {
CurOp::get(opCtx)->debug().cursorExhausted = true;
+ collectTelemetryMongos(opCtx);
return cmdResult;
}
@@ -129,6 +130,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(executor), std::move(params));
ccc->incNBatches();
+ collectTelemetryMongos(opCtx, ccc);
// We don't expect to use this cursor until a subsequent getMore, so detach from the current
// OperationContext until then.
ccc->detachFromOperationContext();