diff options
author | Will Buerger <will.buerger@mongodb.com> | 2023-03-01 17:27:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-01 21:26:19 +0000 |
commit | 12f2e3d221ac01156db60f307d433b56340115c3 (patch) | |
tree | 78cefa99ff44c7215c04d0a41777dcc7e90486a5 | |
parent | c80ca4373ac767037563691c9dd69f57258f2756 (diff) | |
download | mongo-12f2e3d221ac01156db60f307d433b56340115c3.tar.gz |
SERVER-73727: Aggregate mongos metrics in ClusterClientCursor
21 files changed, 403 insertions, 348 deletions
diff --git a/jstests/noPassthrough/telemetry_collect_on_mongos.js b/jstests/noPassthrough/telemetry_collect_on_mongos.js index b54a42f747e..2eee0e214d0 100644 --- a/jstests/noPassthrough/telemetry_collect_on_mongos.js +++ b/jstests/noPassthrough/telemetry_collect_on_mongos.js @@ -5,27 +5,29 @@ (function() { "use strict"; -const st = new ShardingTest({ - mongos: 1, - shards: 1, - config: 1, - rs: {nodes: 1}, - mongosOptions: { - setParameter: { - internalQueryConfigureTelemetrySamplingRate: 2147483647, - featureFlagTelemetry: true, - } - }, -}); - // Redacted literal replacement string. This may change in the future, so it's factored out. const R = "###"; -const mongos = st.s; -const db = mongos.getDB("test"); -const coll = db.coll; -coll.insert({v: 1}); -coll.insert({v: 4}); +const setup = () => { + const st = new ShardingTest({ + mongos: 1, + shards: 1, + config: 1, + rs: {nodes: 1}, + mongosOptions: { + setParameter: { + internalQueryConfigureTelemetrySamplingRate: 2147483647, + featureFlagTelemetry: true, + } + }, + }); + const mongos = st.s; + const db = mongos.getDB("test"); + const coll = db.coll; + coll.insert({v: 1}); + coll.insert({v: 4}); + return st; +}; // Get the telemetry for a given database, filtering out the actual $telemetry call. const getTelemetry = (conn) => { @@ -41,18 +43,26 @@ const getTelemetry = (conn) => { return result.cursor.firstBatch; }; -/** - * Verify that mongos is recording these metrics: - * - "firstSeenTimestamp" - * - "lastExecutionMicros" - * - "execCount" - * - "queryExecMicros" - * - "docsReturned" - */ +const assertExpectedResults = (results, + expectedTelemetryKey, + expectedExecCount, + expectedDocsReturnedSum, + expectedDocsReturnedMax, + expectedDocsReturnedMin, + expectedDocsReturnedSumOfSq) => { + const {key, metrics} = results; + assert.eq(expectedTelemetryKey, key); + assert.eq(expectedExecCount, metrics.execCount); + assert.docEq({ + sum: NumberLong(expectedDocsReturnedSum), + max: NumberLong(expectedDocsReturnedMax), + min: NumberLong(expectedDocsReturnedMin), + sumOfSquares: NumberLong(expectedDocsReturnedSumOfSq) + }, + metrics.docsReturned); -// This test can't predict exact timings, so just assert these three fields have been set (are -// non-zero). -const assertTelemetryMetricsSet = (metrics) => { + // This test can't predict exact timings, so just assert these three fields have been set (are + // non-zero). const {firstSeenTimestamp, lastExecutionMicros, queryExecMicros} = metrics; assert.neq(timestampCmp(firstSeenTimestamp, Timestamp(0, 0)), 0); @@ -64,82 +74,214 @@ const assertTelemetryMetricsSet = (metrics) => { } }; -coll.find({v: {$gt: 0, $lt: 5}}).toArray(); -coll.find({v: {$gt: 2, $lt: 3}}).toArray(); -coll.find({v: {$gt: 0, $lt: 1}}).toArray(); -coll.find({v: {$gt: 0, $lt: 2}}).toArray(); - +// Assert that, for find queries, no telemetry results are written until a cursor has reached +// exhaustion; ensure accurate results once they're written. { - const telemetry = getTelemetry(db); - assert.eq(1, telemetry.length); - const {key, metrics} = telemetry[0]; - const {docsReturned, execCount} = metrics; - assert.eq(4, execCount); - assert.eq( - { - find: { - find: R, - filter: {v: {$gt: R, $lt: R}}, - readConcern: {level: R, provenance: R}, - }, - namespace: `test.${coll.getName()}`, - readConcern: {level: "local", provenance: "implicitDefault"}, - applicationName: "MongoDB Shell" + const st = setup(); + const db = st.s.getDB("test"); + const coll = db.coll; + + const telemetryKey = { + find: { + find: R, + filter: {v: {$gt: R, $lt: R}}, + batchSize: R, + readConcern: {level: R, provenance: R}, }, - key, - ); - assert.eq({ - sum: NumberLong(3), - max: NumberLong(2), - min: NumberLong(0), - sumOfSquares: NumberLong(5), - }, - docsReturned); - assertTelemetryMetricsSet(metrics); -} + namespace: `test.${coll.getName()}`, + readConcern: {level: "local", provenance: "implicitDefault"}, + applicationName: "MongoDB Shell" + }; + + const cursor = coll.find({v: {$gt: 0, $lt: 5}}).batchSize(1); // returns 1 doc + + // Since the cursor hasn't been exhausted yet, ensure no telemetry results have been written + // yet. + let telemetry = getTelemetry(db); + assert.eq(0, telemetry.length); -coll.aggregate([ - {$match: {v: {$gt: 0, $lt: 5}}}, - {$project: {hello: "$world"}}, -]); -coll.aggregate([ - {$match: {v: {$gt: 0, $lt: 5}}}, - {$project: {hello: "$world"}}, -]); -coll.aggregate([ - {$match: {v: {$gt: 2, $lt: 3}}}, - {$project: {hello: "$universe"}}, -]); -coll.aggregate([ - {$match: {v: {$gt: 0, $lt: 2}}}, - {$project: {hello: "$galaxy"}}, -]); + // Run a getMore to exhaust the cursor, then ensure telemetry results have been written + // accurately. batchSize must be 2 so the cursor recognizes exhaustion. + assert.commandWorked(db.runCommand({ + getMore: cursor.getId(), + collection: coll.getName(), + batchSize: 2 + })); // returns 1 doc, exhausts the cursor + // The $telemetry query for the previous `getTelemetry` is included in this call to $telemetry. + telemetry = getTelemetry(db); + assert.eq(2, telemetry.length); + assertExpectedResults(telemetry[0], + telemetryKey, + /* expectedExecCount */ 1, + /* expectedDocsReturnedSum */ 2, + /* expectedDocsReturnedMax */ 2, + /* expectedDocsReturnedMin */ 2, + /* expectedDocsReturnedSumOfSq */ 4); + + // Run more queries (to exhaustion) with the same query shape, and ensure telemetry results are + // accurate. + coll.find({v: {$gt: 2, $lt: 3}}).batchSize(10).toArray(); // returns 0 docs + coll.find({v: {$gt: 0, $lt: 1}}).batchSize(10).toArray(); // returns 0 docs + coll.find({v: {$gt: 0, $lt: 2}}).batchSize(10).toArray(); // return 1 doc + telemetry = getTelemetry(db); + assert.eq(2, telemetry.length); + assertExpectedResults(telemetry[0], + telemetryKey, + /* expectedExecCount */ 4, + /* expectedDocsReturnedSum */ 3, + /* expectedDocsReturnedMax */ 2, + /* expectedDocsReturnedMin */ 0, + /* expectedDocsReturnedSumOfSq */ 5); + + st.stop(); +} +// Assert that, for agg queries, no telemetry results are written until a cursor has reached +// exhaustion; ensure accurate results once they're written. { - const telemetry = getTelemetry(mongos.getDB("test")); - assert.eq(3, telemetry.length); // The $telemetry query for the last test is included in this - // call to $telemetry. - const {key, metrics} = telemetry[1]; - const {docsReturned, execCount} = metrics; - assert.eq(4, execCount); - assert.eq({ - sum: NumberLong(5), - max: NumberLong(2), - min: NumberLong(0), - sumOfSquares: NumberLong(9), - }, - docsReturned); - assert.eq({ + const st = setup(); + const db = st.s.getDB("test"); + const coll = db.coll; + + const telemetryKey = { pipeline: [ {$match: {v: {$gt: R, $lt: R}}}, {$project: {hello: R}}, ], namespace: "test.coll", applicationName: "MongoDB Shell" - }, - key); - assertTelemetryMetricsSet(metrics); + }; + + const cursor = coll.aggregate( + [ + {$match: {v: {$gt: 0, $lt: 5}}}, + {$project: {hello: "$world"}}, + ], + {cursor: {batchSize: 1}}); // returns 1 doc + + // Since the cursor hasn't been exhausted yet, ensure no telemetry results have been written + // yet. + let telemetry = getTelemetry(db); + assert.eq(0, telemetry.length); + + // Run a getMore to exhaust the cursor, then ensure telemetry results have been written + // accurately. batchSize must be 2 so the cursor recognizes exhaustion. + assert.commandWorked(db.runCommand({ + getMore: cursor.getId(), + collection: coll.getName(), + batchSize: 2 + })); // returns 1 doc, exhausts the cursor + // The $telemetry query for the previous `getTelemetry` is included in this call to $telemetry. + telemetry = getTelemetry(db); + assert.eq(2, telemetry.length); + assertExpectedResults(telemetry[0], + telemetryKey, + /* expectedExecCount */ 1, + /* expectedDocsReturnedSum */ 2, + /* expectedDocsReturnedMax */ 2, + /* expectedDocsReturnedMin */ 2, + /* expectedDocsReturnedSumOfSq */ 4); + + // Run more queries (to exhaustion) with the same query shape, and ensure telemetry results are + // accurate. + coll.aggregate([ + {$match: {v: {$gt: 0, $lt: 5}}}, + {$project: {hello: "$world"}}, + ]); // returns 2 docs + coll.aggregate([ + {$match: {v: {$gt: 2, $lt: 3}}}, + {$project: {hello: "$universe"}}, + ]); // returns 0 docs + coll.aggregate([ + {$match: {v: {$gt: 0, $lt: 2}}}, + {$project: {hello: "$galaxy"}}, + ]); // returns 1 doc + telemetry = getTelemetry(db); + assert.eq(2, telemetry.length); + assertExpectedResults(telemetry[0], + telemetryKey, + /* expectedExecCount */ 4, + /* expectedDocsReturnedSum */ 5, + /* expectedDocsReturnedMax */ 2, + /* expectedDocsReturnedMin */ 0, + /* expectedDocsReturnedSumOfSq */ 9); + + st.stop(); } -st.stop(); +// Assert on batchSize-limited find queries that killCursors will write metrics with partial results +// to the telemetry store. +{ + const st = setup(); + const db = st.s.getDB("test"); + const coll = db.coll; + + const telemetryKey = { + find: { + find: R, + filter: {v: {$gt: R, $lt: R}}, + batchSize: R, + readConcern: {level: R, provenance: R}, + }, + namespace: `test.${coll.getName()}`, + readConcern: {level: "local", provenance: "implicitDefault"}, + applicationName: "MongoDB Shell" + }; + + const cursor1 = coll.find({v: {$gt: 0, $lt: 5}}).batchSize(1); // returns 1 doc + const cursor2 = coll.find({v: {$gt: 0, $lt: 2}}).batchSize(1); // returns 1 doc + + assert.commandWorked( + db.runCommand({killCursors: coll.getName(), cursors: [cursor1.getId(), cursor2.getId()]})); + + const telemetry = getTelemetry(db); + assert.eq(1, telemetry.length); + assertExpectedResults(telemetry[0], + telemetryKey, + /* expectedExecCount */ 2, + /* expectedDocsReturnedSum */ 2, + /* expectedDocsReturnedMax */ 1, + /* expectedDocsReturnedMin */ 1, + /* expectedDocsReturnedSumOfSq */ 2); + st.stop(); +} + +// Assert on batchSize-limited agg queries that killCursors will write metrics with partial results +// to the telemetry store. +{ + const st = setup(); + const db = st.s.getDB("test"); + const coll = db.coll; + + const telemetryKey = { + pipeline: [{$match: {v: {$gt: R, $lt: R}}}], + namespace: `test.${coll.getName()}`, + applicationName: "MongoDB Shell" + }; + + const cursor1 = coll.aggregate( + [ + {$match: {v: {$gt: 0, $lt: 5}}}, + ], + {cursor: {batchSize: 1}}); // returns 1 doc + const cursor2 = coll.aggregate( + [ + {$match: {v: {$gt: 0, $lt: 2}}}, + ], + {cursor: {batchSize: 1}}); // returns 1 doc + + assert.commandWorked( + db.runCommand({killCursors: coll.getName(), cursors: [cursor1.getId(), cursor2.getId()]})); + + const telemetry = getTelemetry(db); + assert.eq(1, telemetry.length); + assertExpectedResults(telemetry[0], + telemetryKey, + /* expectedExecCount */ 2, + /* expectedDocsReturnedSum */ 2, + /* expectedDocsReturnedMax */ 1, + /* expectedDocsReturnedMin */ 1, + /* expectedDocsReturnedSumOfSq */ 2); + st.stop(); +} }()); diff --git a/jstests/telemetry/telemetry_metrics_across_getMore_calls.js b/jstests/telemetry/telemetry_metrics_across_getMore_calls.js index 2a411fe808f..89ed9a9156c 100644 --- a/jstests/telemetry/telemetry_metrics_across_getMore_calls.js +++ b/jstests/telemetry/telemetry_metrics_across_getMore_calls.js @@ -50,15 +50,9 @@ assert.commandWorked(bulk.execute()); assert.eq(telemetryEntry.metrics.execCount, 2); // Assert telemetry values are accurate for the two above queries. - assert.eq(telemetryEntry.metrics.docsScanned.sum, 2 * numDocs); - assert.eq(telemetryEntry.metrics.docsScanned.min, numDocs); - assert.eq(telemetryEntry.metrics.docsScanned.max, numDocs); assert.eq(telemetryEntry.metrics.docsReturned.sum, numDocs); assert.eq(telemetryEntry.metrics.docsReturned.min, numDocs / 2); assert.eq(telemetryEntry.metrics.docsReturned.max, numDocs / 2); - assert.eq(telemetryEntry.metrics.keysScanned.sum, 0); - assert.eq(telemetryEntry.metrics.keysScanned.min, 0); - assert.eq(telemetryEntry.metrics.keysScanned.max, 0); verifyMetrics(telemetryResults); } @@ -70,7 +64,7 @@ const fooNeBatchSize = 3; { let cursor1 = coll.find({foo: {$eq: 0}}).batchSize(fooEqBatchSize); let cursor2 = coll.find({foo: {$ne: 0}}).batchSize(fooNeBatchSize); - // Issue one getMore for the first query, so 2 * cursor1BatchSize documents are returned total. + // Issue one getMore for the first query, so 2 * fooEqBatchSize documents are returned total. assert.commandWorked(testDB.runCommand( {getMore: cursor1.getId(), collection: coll.getName(), batchSize: fooEqBatchSize})); @@ -112,11 +106,6 @@ const fooNeBatchSize = 3; .toArray(); assert.eq(telemetryResults.length, 4, telemetryResults); - // Assert no keys scanned when no index exists. - assert.eq(telemetryResults[0].metrics.keysScanned.sum, 0); - assert.eq(telemetryResults[1].metrics.keysScanned.sum, 0); - assert.eq(telemetryResults[2].metrics.keysScanned.sum, 0); - assert.eq(telemetryResults[3].metrics.keysScanned.sum, 0); verifyMetrics(telemetryResults); // This filters to just the telemetry for query coll.find().sort({"foo": 1}).batchSize(2). @@ -164,25 +153,5 @@ const fooNeBatchSize = 3; assert.eq(telemetryResults[0].metrics.docsReturned.min, 2 * fooEqBatchSize); } -// Ensure that for queries using an index, keys scanned is nonzero. -{ - assert.commandWorked(coll.createIndex({bar: 1})); - coll.aggregate([{$match: {$or: [{bar: 1, foo: 1}]}}], {cursor: {batchSize: 2}}); - - // This filters telemetry entries to just the one entered for the above agg command. - const telemetryResults = - testDB.getSiblingDB("admin") - .aggregate([ - {$telemetry: {}}, - {$match: {"key.pipeline.$match.$or": {$eq: [{'bar': '###', 'foo': '###'}]}}} - ]) - .toArray(); - assert.eq(telemetryResults.length, 1, telemetryResults); - assert.eq(telemetryResults[0].key.namespace, `test.${jsTestName()}`); - assert.eq(telemetryResults[0].key.applicationName, "MongoDB Shell"); - assert.gt(telemetryResults[0].metrics.keysScanned.sum, 0); - verifyMetrics(telemetryResults); -} - MongoRunner.stopMongod(conn); }()); diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 6c74f6be1ec..0fd963220a3 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -157,13 +157,7 @@ void ClientCursor::dispose(OperationContext* opCtx, boost::optional<Date_t> now) } if (_telemetryStoreKey && opCtx) { - telemetry::writeTelemetry(opCtx, - _telemetryStoreKey, - _queryOptMicros, - _queryExecMicros, - _docsReturned, - _metrics.docsExamined.value_or(0), - _metrics.keysExamined.value_or(0)); + telemetry::writeTelemetry(opCtx, _telemetryStoreKey, _queryExecMicros, _docsReturned); } if (now) { @@ -386,44 +380,27 @@ void startClientCursorMonitor() { getClientCursorMonitor(getGlobalServiceContext()).go(); } -void collectTelemetry(OperationContext* opCtx, - boost::optional<ClientCursorPin&> pinnedCursor, - bool isGetMoreOp) { +void collectTelemetryMongod(OperationContext* opCtx, ClientCursorPin& pinnedCursor) { auto&& opDebug = CurOp::get(opCtx)->debug(); - if (pinnedCursor) { - auto cursor = pinnedCursor->getCursor(); - // TODO SERVER-73727 setting shouldRecordTelemetry to boost::none is only - // temporarily necessary to avoid collecting metrics a second time in - // CurOp::completeAndLogOperation - opDebug.telemetryStoreKey = boost::none; - - // We have to use `elapsedTimeExcludingPauses` to count execution time since - // additiveMetrics.queryExecMicros isn't set until curOp is closing out. - cursor->incCursorMetrics(opDebug.additiveMetrics, - CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), - opDebug.nreturned); - - // If the current operation is a getMore, planning time was already aggregated on the - // initial operation. - if (!isGetMoreOp) { - cursor->setQueryOptMicros(opDebug.planningTime.count()); - } - } else { - tassert(7301701, "getMore operations should aggregate metrics on the cursor", !isGetMoreOp); - auto&& opDebug = CurOp::get(opCtx)->debug(); - // If we haven't registered a cursor to prepare for getMore requests, we record - // telemetry directly. - // - // We have to use `elapsedTimeExcludingPauses` to count execution time since - // additiveMetrics.queryExecMicros isn't set until curOp is closing out. - telemetry::writeTelemetry(opCtx, - opDebug.telemetryStoreKey, - opDebug.planningTime.count(), - CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), - opDebug.nreturned, - opDebug.additiveMetrics.docsExamined.value_or(0), - opDebug.additiveMetrics.keysExamined.value_or(0)); - } + + // We have to use `elapsedTimeExcludingPauses` to count execution time since + // additiveMetrics.queryExecMicros isn't set until curOp is closing out. + pinnedCursor->incrementCursorMetrics(opDebug.additiveMetrics, + CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), + opDebug.nreturned); +} + +void collectTelemetryMongod(OperationContext* opCtx) { + auto&& opDebug = CurOp::get(opCtx)->debug(); + // If we haven't registered a cursor to prepare for getMore requests, we record + // telemetry directly. + // + // We have to use `elapsedTimeExcludingPauses` to count execution time since + // additiveMetrics.queryExecMicros isn't set until curOp is closing out. + telemetry::writeTelemetry(opCtx, + opDebug.telemetryStoreKey, + CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), + opDebug.nreturned); } } // namespace mongo diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index a9bd095c2ce..11573fde1df 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -215,14 +215,9 @@ public: _nReturnedSoFar = n; } - void setQueryOptMicros(uint64_t queryOptMicros) { - tassert(7301700, "queryOptMicros should only be set once per cursor", _queryOptMicros == 0); - _queryOptMicros = queryOptMicros; - } - - void incCursorMetrics(OpDebug::AdditiveMetrics newMetrics, - uint64_t newExecutionTime, - uint64_t newDocsReturned) { + void incrementCursorMetrics(OpDebug::AdditiveMetrics newMetrics, + uint64_t newExecutionTime, + uint64_t newDocsReturned) { _metrics.add(newMetrics); _queryExecMicros += newExecutionTime; _docsReturned += newDocsReturned; @@ -467,7 +462,6 @@ private: // If boost::none, telemetry should not be collected for this cursor. boost::optional<BSONObj> _telemetryStoreKey; // Metrics used for telemetry. TODO SERVER-73933 consider merging more into '_metrics' - uint64_t _queryOptMicros = 0; uint64_t _queryExecMicros = 0; uint64_t _docsReturned = 0; OpDebug::AdditiveMetrics _metrics; @@ -604,8 +598,7 @@ void startClientCursorMonitor(); * provided, metrics are aggregated on the cursor; otherwise, metrics are written directly to the * telemetry store. */ -void collectTelemetry(OperationContext* opCtx, - boost::optional<ClientCursorPin&> cursor, - bool isGetMoreOp); +void collectTelemetryMongod(OperationContext* opCtx, ClientCursorPin& cursor); +void collectTelemetryMongod(OperationContext* opCtx); } // namespace mongo diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 7d42b2ee70e..cf39d747f87 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -726,10 +726,10 @@ public: CurOp::get(opCtx)->debug().storageStats = opCtx->recoveryUnit()->computeOperationStatisticsSinceLastCall(); } - collectTelemetry(opCtx, pinnedCursor, false); + collectTelemetryMongod(opCtx, pinnedCursor); } else { endQueryOp(opCtx, collection, *exec, numResults, cursorId); - collectTelemetry(opCtx, boost::none, false); + collectTelemetryMongod(opCtx); } // Generate the response object to send to the client. diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 46fb19627ea..86f324150e9 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -695,7 +695,7 @@ public: // response batch. curOp->debug().nreturned = numResults; - collectTelemetry(opCtx, cursorPin, true); + collectTelemetryMongod(opCtx, cursorPin); if (respondWithId) { cursorDeleter.dismiss(); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index d21f9801319..3c3e0240584 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -1180,8 +1180,11 @@ Status runAggregate(OperationContext* opCtx, curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; - boost::optional<ClientCursorPin&> cursorForTelemetry = pins[0]; - collectTelemetry(opCtx, keepCursor ? cursorForTelemetry : boost::none, false); + if (keepCursor) { + collectTelemetryMongod(opCtx, pins[0]); + } else { + collectTelemetryMongod(opCtx); + } // For an optimized away pipeline, signal the cache that a query operation has completed. // For normal pipelines this is done in DocumentSourceCursor. diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 7581f999cb2..e868ba135dc 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -445,9 +445,6 @@ bool CurOp::completeAndLogOperation(logv2::LogComponent component, _debug.executionTime = duration_cast<Microseconds>(elapsedTimeExcludingPauses()); const auto executionTimeMillis = durationCount<Milliseconds>(_debug.executionTime); - // TODO SERVER-73727 remove telemetry collection here once metrics are aggregated in cursor - telemetry::collectTelemetry(opCtx, CurOp::get(opCtx)->debug()); - if (_debug.isReplOplogGetMore) { oplogGetMoreStats.recordMillis(executionTimeMillis); } diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index a979f10da8d..f59a0c1e692 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -505,50 +505,6 @@ void throwIfEncounteringFLEPayload(const BSONElement& e) { } /** - * Get the metrics for a given key holding the appropriate locks. - */ -class LockedMetrics { - LockedMetrics(TelemetryMetrics* metrics, - TelemetryStore& telemetryStore, - TelemetryStore::Partition partitionLock) - : _metrics(metrics), - _telemetryStore(telemetryStore), - _partitionLock(std::move(partitionLock)) {} - -public: - static LockedMetrics get(OperationContext* opCtx, const BSONObj& telemetryKey) { - auto&& telemetryStore = getTelemetryStore(opCtx); - auto&& [statusWithMetrics, partitionLock] = - telemetryStore.getWithPartitionLock(telemetryKey); - TelemetryMetrics* metrics; - if (statusWithMetrics.isOK()) { - metrics = statusWithMetrics.getValue(); - } else { - size_t numEvicted = telemetryStore.put(telemetryKey, {}, partitionLock); - telemetryEvictedMetric.increment(numEvicted); - auto newMetrics = partitionLock->get(telemetryKey); - // This can happen if the budget is immediately exceeded. Specifically if the there is - // not enough room for a single new entry if the number of partitions is too high - // relative to the size. - tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); - metrics = &newMetrics.getValue()->second; - } - return LockedMetrics{metrics, telemetryStore, std::move(partitionLock)}; - } - - TelemetryMetrics* operator->() const { - return _metrics; - } - -private: - TelemetryMetrics* _metrics; - - TelemetryStore& _telemetryStore; - - TelemetryStore::Partition _partitionLock; -}; - -/** * Upon reading telemetry data, we redact some keys. This is the list. See * TelemetryMetrics::redactKey(). */ @@ -755,80 +711,38 @@ void registerFindRequest(const FindCommandRequest& request, CurOp::get(opCtx)->debug().telemetryStoreKey = telemetryKey.obj(); } -// TODO SERVER-73727 registerGetMoreRequest should be removed once metrics are aggregated on cursors -// across getMores on mongos -void registerGetMoreRequest(OperationContext* opCtx) { - if (!isTelemetryEnabled()) { - return; - } - - // Rate limiting is important in all cases as it limits the amount of CPU telemetry uses to - // prevent degrading query throughput. This is essential in the case of a large find - // query with a batchsize of 1, where collecting metrics on every getMore would quickly impact - // the number of queries the system can process per second (query throughput). This is why not - // only are originating queries rate limited but also their subsequent getMore operations. - if (!shouldCollect(opCtx->getServiceContext())) { - return; - } -} - TelemetryStore& getTelemetryStore(OperationContext* opCtx) { uassert(6579000, "Telemetry is not enabled without the feature flag on", isTelemetryEnabled()); return telemetryStoreDecoration(opCtx->getServiceContext())->getTelemetryStore(); } -void recordExecution(OperationContext* opCtx, bool isFle) { - if (!isTelemetryEnabled()) { - return; - } - if (isFle) { - return; - } - // Confirms that this is an operation the telemetry machinery has decided to collect metrics - // from. - auto&& opDebug = CurOp::get(opCtx)->debug(); - if (!opDebug.telemetryStoreKey) { - return; - } - auto&& metrics = LockedMetrics::get(opCtx, *opDebug.telemetryStoreKey); - metrics->execCount++; - metrics->queryOptMicros.aggregate(opDebug.planningTime.count()); -} - -void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) { - // Confirms that this is an operation the telemetry machinery has decided to collect metrics - // from. - if (!opDebug.telemetryStoreKey) { - return; - } - - auto&& metrics = LockedMetrics::get(opCtx, *opDebug.telemetryStoreKey); - metrics->docsReturned.aggregate(opDebug.nreturned); - metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0)); - metrics->keysScanned.aggregate(opDebug.additiveMetrics.keysExamined.value_or(0)); - metrics->lastExecutionMicros = opDebug.executionTime.count(); - metrics->queryExecMicros.aggregate(opDebug.executionTime.count()); -} - void writeTelemetry(OperationContext* opCtx, boost::optional<BSONObj> telemetryKey, - const uint64_t queryOptMicros, const uint64_t queryExecMicros, - const uint64_t docsReturned, - const uint64_t docsScanned, - const uint64_t keysScanned) { + const uint64_t docsReturned) { if (!telemetryKey) { return; } - auto&& metrics = LockedMetrics::get(opCtx, *telemetryKey); + auto&& telemetryStore = getTelemetryStore(opCtx); + auto&& [statusWithMetrics, partitionLock] = telemetryStore.getWithPartitionLock(*telemetryKey); + TelemetryMetrics* metrics; + if (statusWithMetrics.isOK()) { + metrics = statusWithMetrics.getValue(); + } else { + size_t numEvicted = telemetryStore.put(*telemetryKey, {}, partitionLock); + telemetryEvictedMetric.increment(numEvicted); + auto newMetrics = partitionLock->get(*telemetryKey); + // This can happen if the budget is immediately exceeded. Specifically if the there is + // not enough room for a single new entry if the number of partitions is too high + // relative to the size. + tassert(7064700, "Should find telemetry store entry", newMetrics.isOK()); + metrics = &newMetrics.getValue()->second; + } metrics->lastExecutionMicros = queryExecMicros; metrics->execCount++; - metrics->queryOptMicros.aggregate(queryOptMicros); metrics->queryExecMicros.aggregate(queryExecMicros); metrics->docsReturned.aggregate(docsReturned); - metrics->docsScanned.aggregate(docsScanned); - metrics->keysScanned.aggregate(keysScanned); } } // namespace telemetry } // namespace mongo diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h index 340d502247b..eee56143a1b 100644 --- a/src/mongo/db/query/telemetry.h +++ b/src/mongo/db/query/telemetry.h @@ -131,11 +131,8 @@ public: BSONObjBuilder builder{sizeof(TelemetryMetrics) + 100}; builder.append("lastExecutionMicros", (BSONNumeric)lastExecutionMicros); builder.append("execCount", (BSONNumeric)execCount); - queryOptMicros.appendTo(builder, "queryOptMicros"); queryExecMicros.appendTo(builder, "queryExecMicros"); docsReturned.appendTo(builder, "docsReturned"); - docsScanned.appendTo(builder, "docsScanned"); - keysScanned.appendTo(builder, "keysScanned"); builder.append("firstSeenTimestamp", firstSeenTimestamp); return builder.obj(); } @@ -160,16 +157,10 @@ public: */ uint64_t execCount = 0; - AggregatedMetric queryOptMicros; - AggregatedMetric queryExecMicros; AggregatedMetric docsReturned; - AggregatedMetric docsScanned; - - AggregatedMetric keysScanned; - private: /** * We cache the redacted key the first time it's computed. @@ -208,8 +199,8 @@ TelemetryStore& getTelemetryStore(OperationContext* opCtx); * collect anything but this should be called for all requests. The decision is made based on * the feature flag and telemetry parameters such as rate limiting. * - * The caller is still responsible for subsequently calling collectTelemetry() once the request - * is completed. + * The caller is still responsible for subsequently calling writeTelemetry() once the request is + * completed. * * Note that calling this affects internal state. It should be called once for each request for * which telemetry may be collected. @@ -220,32 +211,12 @@ void registerFindRequest(const FindCommandRequest& request, const NamespaceString& collection, OperationContext* ocCtx); -void registerGetMoreRequest(OperationContext* opCtx); - -// recordExecution is called between registering the query and collecting metrics post execution. -// Its purpose is to track the number of times a given query shape has been ran. The execution count -// is incremented outside of registering the command because the originating command could be an -// explain request and therefore the query is not actually executed. -// TODO SERVER-73727 remove this function -void recordExecution(OperationContext* opCtx, bool isFle); - -/** - * Collect telemetry for the operation identified by the telemetryKey stored on - * opDebug. - * TODO SERVER-73727 remove this function - */ -void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug); - - /** * Writes telemetry to the telemetry store for the operation identified by `telemetryKey`. */ void writeTelemetry(OperationContext* opCtx, boost::optional<BSONObj> telemetryKey, - uint64_t queryOptMicros, uint64_t queryExecMicros, - uint64_t docsReturned, - uint64_t docsScanned, - uint64_t keysScanned); + uint64_t docsReturned); } // namespace telemetry } // namespace mongo diff --git a/src/mongo/db/query/telemetry_store_test.cpp b/src/mongo/db/query/telemetry_store_test.cpp index 6e1762bbfdc..82c2aa8b57d 100644 --- a/src/mongo/db/query/telemetry_store_test.cpp +++ b/src/mongo/db/query/telemetry_store_test.cpp @@ -101,15 +101,15 @@ TEST_F(TelemetryStoreTest, EvictEntries) { // This creates a telemetry store with 2 partitions, each with a size of 1200 bytes. TelemetryStore telStore{2400, 2}; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 20; i++) { auto query = BSON("query" + std::to_string(i) << 1 << "xEquals" << 42); telStore.put(query, TelemetryMetrics{}); } int numKeys = 0; telStore.forEach([&](const BSONObj& key, const TelemetryMetrics& entry) { numKeys++; }); - // Given the size of the bson keys (~46 bytes) and values (~208 bytes), each partition (1200 - // bytes) can hold at most 4 entries. - ASSERT_EQ(numKeys, 8); + // Given the size of the bson keys (~46 bytes) and values (~112 bytes), each partition (1200 + // bytes) can hold at most 7 entries. + ASSERT_EQ(numKeys, 14); } /** diff --git a/src/mongo/s/commands/cluster_find_cmd.h b/src/mongo/s/commands/cluster_find_cmd.h index 52b1149e0fe..23b99fcea2d 100644 --- a/src/mongo/s/commands/cluster_find_cmd.h +++ b/src/mongo/s/commands/cluster_find_cmd.h @@ -250,7 +250,6 @@ public: } firstBatch.setPartialResultsReturned(partialResultsReturned); firstBatch.done(cursorId, cq->nss()); - telemetry::recordExecution(opCtx, _didDoFLERewrite); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { result->reset(); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 867e70d5a50..5955b8c9a75 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -565,8 +565,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, updateHostsTargetedMetrics(opCtx, namespaces.executionNss, cm, involvedNamespaces); // Report usage statistics for each stage in the pipeline. liteParsedPipeline.tickGlobalStageCounters(); - telemetry::recordExecution(opCtx, shouldDoFLERewrite); - // Add 'command' object to explain output. if (expCtx->explain) { explain_common::appendIfRoom( diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 621ed5af3f0..a16a783a14b 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -349,12 +349,24 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); } + bool exhausted = cursorState != ClusterCursorManager::CursorState::NotExhausted; + int nShards = ccc->getNumRemotes(); + + // Fill out the aggregation metrics in CurOp, and record telemetry metrics, before detaching the + // cursor from its opCtx. + CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards); + CurOp::get(opCtx)->debug().cursorExhausted = exhausted; + CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs(); + if (exhausted) { + collectTelemetryMongos(opCtx); + } else { + collectTelemetryMongos(opCtx, ccc); + } + ccc->detachFromOperationContext(); - int nShards = ccc->getNumRemotes(); CursorId clusterCursorId = 0; - - if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { + if (!exhausted) { auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(); clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( opCtx, @@ -363,15 +375,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal, authUser)); - } - - // Fill out the aggregation metrics in CurOp. - if (clusterCursorId > 0) { CurOp::get(opCtx)->debug().cursorid = clusterCursorId; } - CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards); - CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0); - CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs(); responseBuilder.done(clusterCursorId, requestedNss); diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 57c2fa7d100..a83b50f9287 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -221,6 +221,14 @@ public: */ virtual void incNBatches() = 0; + void incrementCursorMetrics(OpDebug::AdditiveMetrics newMetrics, + uint64_t newExecutionTime, + uint64_t newDocsReturned) { + _metrics.add(newMetrics); + _queryExecMicros += newExecutionTime; + _docsReturned += newDocsReturned; + } + // // maxTimeMS support. // @@ -245,6 +253,12 @@ public: _leftoverMaxTimeMicros = leftoverMaxTimeMicros; } +protected: + // Metrics used for telemetry. TODO SERVER-73933 consider merging more into '_metrics' + uint64_t _queryExecMicros = 0; + uint64_t _docsReturned = 0; + OpDebug::AdditiveMetrics _metrics; + private: // Unused maxTime budget for this cursor. Microseconds _leftoverMaxTimeMicros = Microseconds::max(); diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 4f2a333c3be..9f7e32244cd 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -32,11 +32,15 @@ #include <memory> #include "mongo/db/curop.h" +#include "mongo/db/query/telemetry.h" +#include "mongo/logv2/log.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + namespace mongo { static CounterMetric mongosCursorStatsTotalOpened("mongos.cursor.totalOpened"); @@ -69,7 +73,8 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, _opCtx(opCtx), _createdDate(opCtx->getServiceContext()->getPreciseClockSource()->now()), _lastUseDate(_createdDate), - _queryHash(CurOp::get(opCtx)->debug().queryHash) { + _queryHash(CurOp::get(opCtx)->debug().queryHash), + _telemetryStoreKey(CurOp::get(opCtx)->debug().telemetryStoreKey) { dassert(!_params.compareWholeSortKeyOnRouter || SimpleBSONObjComparator::kInstance.evaluate( _params.sortToApplyOnRouter == AsyncResultsMerger::kWholeSortKeySortPattern)); @@ -124,7 +129,20 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() { } void ClusterClientCursorImpl::kill(OperationContext* opCtx) { + if (_hasBeenKilled) { + LOGV2_DEBUG(7372700, + 3, + "Kill called on cluster client cursor after cursor has already been killed, so " + "ignoring"); + return; + } + + if (_telemetryStoreKey && opCtx) { + telemetry::writeTelemetry(opCtx, _telemetryStoreKey, _queryExecMicros, _docsReturned); + } + _root->kill(opCtx); + _hasBeenKilled = true; } void ClusterClientCursorImpl::reattachToOperationContext(OperationContext* opCtx) { diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 4020918761c..3b8dc7f38d5 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -180,6 +180,15 @@ private: // Whether ClusterClientCursor::next() was interrupted due to MaxTimeMSExpired. bool _maxTimeMSExpired = false; + + // If boost::none, telemetry should not be collected for this cursor. + boost::optional<BSONObj> _telemetryStoreKey; + + // Tracks if kill() has been called on the cursor. Multiple calls to kill() are treated as a + // noop. + // TODO SERVER-74482 investigate where kill() is called multiple times and remove unnecessary + // calls + bool _hasBeenKilled = false; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 2f5c3563342..38dbc2b0c56 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -38,6 +38,7 @@ #include "mongo/db/allocate_cursor_id.h" #include "mongo/db/curop.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/query/telemetry.h" #include "mongo/db/session/kill_sessions_common.h" #include "mongo/db/session/logical_session_cache.h" #include "mongo/logv2/log.h" @@ -586,4 +587,36 @@ StatusWith<ClusterClientCursorGuard> ClusterCursorManager::_detachCursor(WithLoc return std::move(cursor); } + +void collectTelemetryMongos(OperationContext* opCtx) { + auto&& opDebug = CurOp::get(opCtx)->debug(); + // If we haven't registered a cursor to prepare for getMore requests, we record + // telemetry directly. + // + // We have to use `elapsedTimeExcludingPauses` to count execution time since + // additiveMetrics.queryExecMicros isn't set until curOp is closing out. + telemetry::writeTelemetry(opCtx, + opDebug.telemetryStoreKey, + CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), + opDebug.nreturned); +} + +void collectTelemetryMongos(OperationContext* opCtx, ClusterClientCursorGuard& cursor) { + auto&& opDebug = CurOp::get(opCtx)->debug(); + // We have to use `elapsedTimeExcludingPauses` to count execution time since + // additiveMetrics.queryExecMicros isn't set until curOp is closing out. + cursor->incrementCursorMetrics(opDebug.additiveMetrics, + CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), + opDebug.nreturned); +} + +void collectTelemetryMongos(OperationContext* opCtx, ClusterCursorManager::PinnedCursor& cursor) { + auto&& opDebug = CurOp::get(opCtx)->debug(); + // We have to use `elapsedTimeExcludingPauses` to count execution time since + // additiveMetrics.queryExecMicros isn't set until curOp is closing out. + cursor->incrementCursorMetrics(opDebug.additiveMetrics, + CurOp::get(opCtx)->elapsedTimeExcludingPauses().count(), + opDebug.nreturned); +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index a7d8c21cb7a..a3acf70d478 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -593,4 +593,13 @@ private: size_t _cursorsTimedOut = 0; }; +/** + * Aggregates telemetry for the current operation via metrics stored on opDebug. If a cursor is + * provided (via ClusterClientCursorGuard or ClusterCursorManager::PinnedCursor), metrics are + * aggregated on the cursor; otherwise, metrics are written directly to the telemetry store. + */ +void collectTelemetryMongos(OperationContext* opCtx); +void collectTelemetryMongos(OperationContext* opCtx, ClusterClientCursorGuard& cursor); +void collectTelemetryMongos(OperationContext* opCtx, ClusterCursorManager::PinnedCursor& cursor); + } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 4ef7f79b3f9..85c2a7a4f37 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -459,6 +459,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, if (shardIds.size() > 0) { updateNumHostsTargetedMetrics(opCtx, cm, shardIds.size()); } + collectTelemetryMongos(opCtx); return CursorId(0); } @@ -470,6 +471,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, : ClusterCursorManager::CursorLifetime::Mortal; auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(); ccc->incNBatches(); + collectTelemetryMongos(opCtx, ccc); auto cursorId = uassertStatusOK(cursorManager->registerCursor( opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime, authUser)); @@ -850,8 +852,6 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitWithPinnedCursorDuringGetMoreBatch"); } - telemetry::registerGetMoreRequest(opCtx); - while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { StatusWith<ClusterQueryResult> next = Status{ErrorCodes::InternalError, "uninitialized cluster query result"}; @@ -931,17 +931,19 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, postBatchResumeToken = pinnedCursor.getValue()->getPostBatchResumeToken(); } + // Set nReturned and whether the cursor has been exhausted. + CurOp::get(opCtx)->debug().cursorExhausted = (idToReturn == 0); + CurOp::get(opCtx)->debug().nreturned = batch.size(); + const bool partialResultsReturned = pinnedCursor.getValue()->partialResultsReturned(); pinnedCursor.getValue()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); pinnedCursor.getValue()->incNBatches(); + collectTelemetryMongos(opCtx, pinnedCursor.getValue()); + // Upon successful completion, transfer ownership of the cursor back to the cursor manager. If // the cursor has been exhausted, the cursor manager will clean it up for us. pinnedCursor.getValue().returnCursor(cursorState); - // Set nReturned and whether the cursor has been exhausted. - CurOp::get(opCtx)->debug().cursorExhausted = (idToReturn == 0); - CurOp::get(opCtx)->debug().nreturned = batch.size(); - if (MONGO_unlikely(waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch.shouldFail())) { CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch, diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index fcd1a89dd3d..ab36d7bdad2 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -97,6 +97,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, if (incomingCursorResponse.getValue().getCursorId() == CursorId(0)) { CurOp::get(opCtx)->debug().cursorExhausted = true; + collectTelemetryMongos(opCtx); return cmdResult; } @@ -129,6 +130,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(executor), std::move(params)); ccc->incNBatches(); + collectTelemetryMongos(opCtx, ccc); // We don't expect to use this cursor until a subsequent getMore, so detach from the current // OperationContext until then. ccc->detachFromOperationContext(); |