diff options
18 files changed, 1005 insertions, 684 deletions
diff --git a/jstests/sharding/analyze_shard_key/query_analysis_sample_counters_reporting.js b/jstests/sharding/analyze_shard_key/query_analysis_sample_counters_reporting.js new file mode 100644 index 00000000000..12aa8544846 --- /dev/null +++ b/jstests/sharding/analyze_shard_key/query_analysis_sample_counters_reporting.js @@ -0,0 +1,348 @@ +/** + * Test output of "query analyzer" section of currentOp and serverStatus commands during query + * sampling. + * + * @tags: [requires_fcv_63, featureFlagAnalyzeShardKey] + */ + +(function() { +"use strict"; + +// load("jstests/libs/analyze_shard_key_util.js"); +load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js"); + +const dbName = "testDb"; +const collName = "testColl"; +const nss = dbName + '.' + collName; +const kNumDocs = 10; +const kSampleRate = kNumDocs * 10; + +const opKindRead = 0; +const opKindWrite = 1; + +function getCurrentOpAndServerStatus(st) { + const mongosCurrOp = st.s0.getDB("admin") + .aggregate([ + {$currentOp: {allUsers: true, localOps: true}}, + {$match: {desc: "query analyzer"}} + ]) + .toArray(); + const mongodCurrOp = assert + .commandWorked(st.rs0.getPrimary().adminCommand( + {currentOp: true, desc: "query analyzer"})) + .inprog; + const mongosServerStatus = assert.commandWorked(st.s0.adminCommand({serverStatus: 1})); + const mongodServerStatus = + assert.commandWorked(st.rs0.getPrimary().adminCommand({serverStatus: 1})); + return { + "currentOp": {"mongos": mongosCurrOp, "mongod": mongodCurrOp}, + "serverStatus": { + "mongos": mongosServerStatus.queryAnalyzers, + "mongod": mongodServerStatus.queryAnalyzers + } + }; +} + +/** + * Runs a db command, and compares resulting currentOp and serverStatus to oldState, the + * initial state before running the command. + * Returns the output of currentOp and serverStatus of both mongod and mongos. + */ +function runCommandAndAssertCurrentOpAndServerStatus(st, db, command, opKind, oldState) { + assert.commandWorked(db.runCommand(command)); + + const newState = getCurrentOpAndServerStatus(st); + + assert.eq(newState.currentOp.mongos.length, 1); + assert.eq(newState.currentOp.mongod.length, 1); + if (oldState) { + assert.eq(oldState.currentOp.mongos.length, newState.currentOp.mongos.length); + assert.eq(oldState.currentOp.mongod.length, newState.currentOp.mongod.length); + assert.eq(oldState.serverStatus.mongos.activeCollections, + newState.serverStatus.mongos.activeCollections); + assert.eq(oldState.serverStatus.mongos.totalCollections, + newState.serverStatus.mongos.totalCollections); + assert.eq(oldState.serverStatus.mongod.totalCollections, + newState.serverStatus.mongod.totalCollections); + + if (opKind === opKindRead) { + assert.eq(oldState.currentOp.mongos[0].sampledReadsCount + 1, + newState.currentOp.mongos[0].sampledReadsCount); + assert.eq(oldState.currentOp.mongos[0].sampledWritesCount, + newState.currentOp.mongos[0].sampledWritesCount); + + printjson(newState); + assert.eq(oldState.currentOp.mongod[0].sampledReadsCount + 1, + newState.currentOp.mongod[0].sampledReadsCount); + assert.eq(oldState.currentOp.mongod[0].sampledWritesCount, + newState.currentOp.mongod[0].sampledWritesCount); + // Instead of figuring out the size of the sample being written, just make sure + // that the byte counter is greater than before. + assert.lt(oldState.currentOp.mongod[0].sampledReadsBytes, + newState.currentOp.mongod[0].sampledReadsBytes); + assert.eq(oldState.currentOp.mongod[0].sampledWritesBytes, + newState.currentOp.mongod[0].sampledWritesBytes); + + assert.eq(oldState.serverStatus.mongos.totalSampledReadsCount + 1, + newState.serverStatus.mongos.totalSampledReadsCount); + assert.eq(oldState.serverStatus.mongos.totalSampledWritesCount, + newState.serverStatus.mongos.totalSampledWritesCount); + + assert.eq(oldState.serverStatus.mongod.totalSampledReadsCount + 1, + newState.serverStatus.mongod.totalSampledReadsCount); + assert.eq(oldState.serverStatus.mongod.totalSampledWritesCount, + newState.serverStatus.mongod.totalSampledWritesCount); + assert.lt(oldState.serverStatus.mongod.totalSampledReadsBytes, + newState.serverStatus.mongod.totalSampledReadsBytes); + assert.eq(oldState.serverStatus.mongod.totalSampledWritesBytes, + newState.serverStatus.mongod.totalSampledWritesBytes); + } else if (opKind === opKindWrite) { + assert.eq(oldState.currentOp.mongos[0].sampledReadsCount, + newState.currentOp.mongos[0].sampledReadsCount); + assert.eq(oldState.currentOp.mongos[0].sampledWritesCount + 1, + newState.currentOp.mongos[0].sampledWritesCount); + + assert.eq(oldState.currentOp.mongod[0].sampledReadsCount, + newState.currentOp.mongod[0].sampledReadsCount); + assert.eq(oldState.currentOp.mongod[0].sampledWritesCount + 1, + newState.currentOp.mongod[0].sampledWritesCount); + assert.eq(oldState.currentOp.mongod[0].sampledReadsBytes, + newState.currentOp.mongod[0].sampledReadsBytes); + assert.lt(oldState.currentOp.mongod[0].sampledWritesBytes, + newState.currentOp.mongod[0].sampledWritesBytes); + + assert.eq(oldState.serverStatus.mongos.totalSampledReadsCount, + newState.serverStatus.mongos.totalSampledReadsCount); + assert.eq(oldState.serverStatus.mongos.totalSampledWritesCount + 1, + newState.serverStatus.mongos.totalSampledWritesCount); + + assert.eq(oldState.serverStatus.mongod.totalSampledReadsCount, + newState.serverStatus.mongod.totalSampledReadsCount); + assert.eq(oldState.serverStatus.mongod.totalSampledWritesCount + 1, + newState.serverStatus.mongod.totalSampledWritesCount); + assert.eq(oldState.serverStatus.mongod.totalSampledReadsBytes, + newState.serverStatus.mongod.totalSampledReadsBytes); + assert.lt(oldState.serverStatus.mongod.totalSampledWritesBytes, + newState.serverStatus.mongod.totalSampledWritesBytes); + } else { + throw new Error("Unknown operation kind " + opKind); + } + } + return newState; +} + +function testCurrentOpAndServerStatusBasic() { + const st = new ShardingTest({ + shards: 1, + mongos: 1, + rs: { + nodes: 1, + setParameter: { + queryAnalysisWriterIntervalSecs: 1, + } + }, + mongosOptions: { + setParameter: { + queryAnalysisSamplerConfigurationRefreshSecs: 1, + } + } + }); + + const db = st.s.getDB(dbName); + const collection = db.getCollection(collName); + + //// Insert initial documents. + + const bulk = collection.initializeUnorderedBulkOp(); + for (let i = 0; i < kNumDocs; i++) { + bulk.insert({x: i, y: i}); + } + assert.commandWorked(bulk.execute()); + + //// Execute currentOp before any query sampling. + + const startState = getCurrentOpAndServerStatus(st); + assert.eq(startState.currentOp.mongos.length, 0); + assert.eq(startState.currentOp.mongod.length, 0); + + //// Set initial state of counters. + + const initialState = { + currentOp: { + mongos: [{ + desc: "query analyzer", + ns: nss, + sampleRate: kSampleRate, + sampledReadsCount: 0, + sampledWritesCount: 0 + }], + mongod: [{ + desc: "query analyzer", + ns: nss, + sampledReadsCount: 0, + sampledWritesCount: 0, + sampledReadsBytes: 0, + sampledWritesBytes: 0 + }] + }, + serverStatus: { + mongos: { + activeCollections: 1, + totalCollections: 1, + totalSampledReadsCount: 0, + totalSampledWritesCount: 0 + }, + mongod: { + totalCollections: 1, + totalSampledReadsCount: 0, + totalSampledWritesCount: 0, + totalSampledReadsBytes: 0, + totalSampledWritesBytes: 0 + } + } + }; + + //// Start query analysis and send find queries. + + assert.commandWorked( + st.s.adminCommand({configureQueryAnalyzer: nss, mode: "full", sampleRate: kSampleRate})); + + QuerySamplingUtil.waitForActiveSampling(st.s); + + //// Execute different kinds of queries and check counters. + + // Byte size of queries (fourth parameter in calls to doCommandAndAssertCounterState()) are + // determined empirically. + let state = runCommandAndAssertCurrentOpAndServerStatus( + st, db, {find: collName, filter: {x: 1}}, opKindRead, initialState); + state = + runCommandAndAssertCurrentOpAndServerStatus(st, db, {count: collName}, opKindRead, state); + state = runCommandAndAssertCurrentOpAndServerStatus( + st, db, {update: collName, updates: [{q: {x: 1}, u: {updated: true}}]}, opKindWrite, state); + state = runCommandAndAssertCurrentOpAndServerStatus( + st, + db, + {findAndModify: collName, query: {updated: true}, update: {$set: {modified: 1}}}, + opKindWrite, + state); + state = runCommandAndAssertCurrentOpAndServerStatus( + st, db, {delete: collName, deletes: [{q: {x: 1}, limit: 1}]}, opKindWrite, state); + + //// Stop query analysis and check counters + assert.commandWorked(st.s.adminCommand({configureQueryAnalyzer: nss, mode: "off"})); + + let lastState; + assert.soon(() => { + lastState = getCurrentOpAndServerStatus(st); + return 0 == lastState.serverStatus.mongos.activeCollections; + }); + assert.soon(() => { + lastState = getCurrentOpAndServerStatus(st); + return 0 == lastState.currentOp.mongod.length && 0 == lastState.currentOp.mongos.length; + }); + assert.eq(0, lastState.serverStatus.mongos.activeCollections); + assert.eq(0, lastState.serverStatus.mongod.activeCollections); + assert.eq(1, lastState.serverStatus.mongos.totalCollections); + assert.eq(1, lastState.serverStatus.mongod.totalCollections); + assert.eq(state.serverStatus.mongos.totalSampledReadsCount, + lastState.serverStatus.mongos.totalSampledReadsCount); + assert.eq(state.serverStatus.mongos.totalSampledWritesCount, + lastState.serverStatus.mongos.totalSampledWritesCount); + assert.eq(1, lastState.serverStatus.mongos.totalCollections); + assert.eq(state.serverStatus.mongod.totalSampledReadsCount, + lastState.serverStatus.mongod.totalSampledReadsCount); + assert.eq(state.serverStatus.mongod.totalSampledWritesCount, + lastState.serverStatus.mongod.totalSampledWritesCount); + assert.eq(state.serverStatus.mongod.totalSampledReadsBytes, + lastState.serverStatus.mongod.totalSampledReadsBytes); + assert.eq(state.serverStatus.mongod.totalSampledWritesBytes, + lastState.serverStatus.mongod.totalSampledWritesBytes); + + st.stop(); +} + +/** + * Tests that mongos reports current information in currentOp even if its sampling rate is 0. + * Specifically, a mongos's local sample rate will be 0 when all queries are being routed through + * other mongos's. It should still report in currentOp the sampling of a collection with sample rate + * of 0. + */ +function testCurrentOpZeroSampleRateMongos() { + const st = new ShardingTest({ + shards: 1, + mongos: [ + { + setParameter: { + queryAnalysisSamplerConfigurationRefreshSecs: 1, + } + }, + { + setParameter: { + // This failpoint will force this mongos to not count any queries or internal + // commands being processed through it. This will force its local sample rate + // to be exactly 0. + "failpoint.overwriteQueryAnalysisSamplerAvgLastCountToZero": + tojson({mode: "alwaysOn"}), + queryAnalysisSamplerConfigurationRefreshSecs: 1, + } + } + ], + rs: { + nodes: 2, + setParameter: { + queryAnalysisWriterIntervalSecs: 1, + } + }, + }); + + const mongos0Db = st.s0.getDB(dbName); + const mongos1Db = st.s1.getDB(dbName); + const mongos0Collection = mongos0Db.getCollection(collName); + + //// Insert initial documents. + + const bulk = mongos0Collection.initializeUnorderedBulkOp(); + for (let i = 0; i < kNumDocs; i++) { + bulk.insert({x: i, y: i}); + } + assert.commandWorked(bulk.execute()); + + //// Turn on sampling and execute a bunch of queries through mongos0. + + assert.commandWorked( + st.s.adminCommand({configureQueryAnalyzer: nss, mode: "full", sampleRate: 1000.0})); + + QuerySamplingUtil.waitForActiveSampling(st.s); + + let state; + for (let i = -1; i++; i < kNumDocs * 2) { + state = runCommandAndAssertCurrentOpAndServerStatus( + st, mongos0Db, {find: collName, filter: {x: i % kNumDocs}}, opKindRead, state); + } + + // Wait for at least queryAnalysisSamplerConfigurationRefreshSecs so that mongos1 can + // refresh its sample rate. + sleep(2000); + + const mongosCurrOp = assert + .commandWorked(mongos1Db.adminCommand({ + aggregate: 1, + pipeline: [ + {$currentOp: {allUsers: true, localOps: true}}, + {$match: {desc: "query analyzer"}} + ], + cursor: {} + })) + .cursor.firstBatch[0]; + assert.eq(0.0, mongosCurrOp.sampleRate); + assert.eq(0, mongosCurrOp.sampledReadsCount); + assert.eq(0, mongosCurrOp.sampledWritesCount); + + st.stop(); +} + +{ + testCurrentOpAndServerStatusBasic(); + testCurrentOpZeroSampleRateMongos(); +} +})(); diff --git a/jstests/sharding/analyze_shard_key/query_analysis_sampling_current_op_test.js b/jstests/sharding/analyze_shard_key/query_analysis_sampling_current_op_test.js deleted file mode 100644 index 264af4e1c56..00000000000 --- a/jstests/sharding/analyze_shard_key/query_analysis_sampling_current_op_test.js +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Test "query analyzer" section of currentOp command during query sampling. - * - * @tags: [requires_fcv_63, featureFlagAnalyzeShardKey] - */ - -(function() { -"use strict"; - -// load("jstests/libs/analyze_shard_key_util.js"); -load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js"); - -const dbName = "testDb"; -const collName = "collection0"; -const nss = dbName + '.' + collName; -const kNumDocs = 100; -const kSampleRate = 100000; - -function getCurrentOp(st) { - const adminDB = st.s.getDB("admin"); - const mongosResult = adminDB - .aggregate([ - {$currentOp: {allUsers: true, localOps: true}}, - {$match: {desc: "query analyzer"}} - ]) - .toArray(); - const mongodResult = assert - .commandWorked(st.rs0.getPrimary().adminCommand( - {currentOp: true, desc: "query analyzer"})) - .inprog; - return {"mongos": mongosResult, "mongod": mongodResult}; -} - -{ - const st = new ShardingTest({ - shards: 1, - rs: { - nodes: 2, - setParameter: { - queryAnalysisWriterIntervalSecs: 1, - } - }, - mongosOptions: { - setParameter: { - queryAnalysisSamplerConfigurationRefreshSecs: 1, - } - } - }); - - const mydb = st.s.getDB(dbName); - const collection = mydb.getCollection(collName); - - //// Insert initial documents. - - const bulk = collection.initializeUnorderedBulkOp(); - for (let i = 0; i < kNumDocs; i++) { - bulk.insert({x: i, y: i}); - } - assert.commandWorked(bulk.execute()); - - //// Execute currentOp before any query sampling. - - let result = getCurrentOp(st); - let mongosResult = result.mongos; - let mongodResult = result.mongod; - assert.eq(mongosResult.length, 0); - assert.eq(mongodResult.length, 0); - - let expectedReadsCount = 0; - let expectedWritesCount = 0; - let expectedReadsBytes = 0; - let expectedWritesBytes = 0; - - //// Start query analysis and send find queries. - assert.commandWorked( - st.s.adminCommand({configureQueryAnalyzer: nss, mode: "full", sampleRate: kSampleRate})); - - QuerySamplingUtil.waitForActiveSampling(st.s); - - for (let i = 0; i < kNumDocs; i++) { - assert.commandWorked(mydb.runCommand({find: collName, filter: {x: i}})); - ++expectedReadsCount; - // The sample document size is determined empirically. - expectedReadsBytes += 181; - } - - result = getCurrentOp(st); - mongosResult = result.mongos; - mongodResult = result.mongod; - - assert.eq(mongosResult.length, 1); - assert.eq(mongosResult[0].sampleRate, kSampleRate); - assert.eq(mongosResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongosResult[0].sampledWritesCount, expectedWritesCount); - - assert.eq(mongodResult.length, 1); - assert.eq(mongodResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongodResult[0].sampledReadsBytes, expectedReadsBytes); - assert.eq(mongodResult[0].sampledWritesCount, expectedWritesCount); - assert.eq(mongodResult[0].sampledWritesBytes, expectedWritesBytes); - - //// Send update queries. - - for (let i = 0; i < kNumDocs; ++i) { - assert.commandWorked( - mydb.runCommand({update: collName, updates: [{q: {x: i}, u: {updated: true}}]})); - ++expectedWritesCount; - // The sample document size is determined empirically. - expectedWritesBytes += 327; - } - - result = getCurrentOp(st); - mongosResult = result.mongos; - mongodResult = result.mongod; - - assert.eq(mongosResult.length, 1); - assert.eq(mongosResult[0].sampleRate, kSampleRate); - assert.eq(mongosResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongosResult[0].sampledWritesCount, expectedWritesCount); - - assert.eq(mongodResult.length, 1); - assert.eq(mongodResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongodResult[0].sampledReadsBytes, expectedReadsBytes); - assert.eq(mongodResult[0].sampledWritesCount, expectedWritesCount); - assert.eq(mongodResult[0].sampledWritesBytes, expectedWritesBytes); - - //// Send findAndModify queries. - - for (let i = 0; i < kNumDocs; ++i) { - const result = assert.commandWorked(mydb.runCommand( - {findAndModify: collName, query: {updated: true}, update: {$set: {modified: 1}}})); - ++expectedWritesCount; - // The sample document size is determined empirically. - expectedWritesBytes += 292; - } - - result = getCurrentOp(st); - mongosResult = result.mongos; - mongodResult = result.mongod; - - assert.eq(mongosResult.length, 1); - assert.eq(mongosResult[0].sampleRate, kSampleRate); - assert.eq(mongosResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongosResult[0].sampledWritesCount, expectedWritesCount); - - assert.eq(mongodResult.length, 1); - assert.eq(mongodResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongodResult[0].sampledReadsBytes, expectedReadsBytes); - assert.eq(mongodResult[0].sampledWritesCount, expectedWritesCount); - assert.eq(mongodResult[0].sampledWritesBytes, expectedWritesBytes); - - //// Send delete queries. - - for (let i = 0; i < kNumDocs; ++i) { - assert.commandWorked(mydb.runCommand({delete: collName, deletes: [{q: {x: i}, limit: 1}]})); - ++expectedWritesCount; - // The sample document size is determined empirically. - expectedWritesBytes += 303; - } - - result = getCurrentOp(st); - mongosResult = result.mongos; - mongodResult = result.mongod; - - assert.eq(mongosResult.length, 1); - assert.eq(mongosResult[0].sampleRate, kSampleRate); - assert.eq(mongosResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongosResult[0].sampledWritesCount, expectedWritesCount); - - assert.eq(mongodResult.length, 1); - assert.eq(mongodResult[0].sampledReadsCount, expectedReadsCount); - assert.eq(mongodResult[0].sampledReadsBytes, expectedReadsBytes); - assert.eq(mongodResult[0].sampledWritesCount, expectedWritesCount); - assert.eq(mongodResult[0].sampledWritesBytes, expectedWritesBytes); - - st.stop(); -} -})(); diff --git a/jstests/sharding/analyze_shard_key/refresh_sample_rates_sharded.js b/jstests/sharding/analyze_shard_key/refresh_sample_rates_sharded.js index 8dd73a0ddb1..66f021c05c6 100644 --- a/jstests/sharding/analyze_shard_key/refresh_sample_rates_sharded.js +++ b/jstests/sharding/analyze_shard_key/refresh_sample_rates_sharded.js @@ -148,7 +148,11 @@ const collUuid1 = getCollectionUuid(collName1); name: st.s1.host, numQueriesExecutedPerSecond: 0 })); - assert.eq(res1.configurations.length, 0); + assert.eq(res1.configurations.length, 2); + assert.sameMembers(res1.configurations, [ + {ns: ns0, collectionUuid: collUuid0, sampleRate: 0}, + {ns: ns1, collectionUuid: collUuid1, sampleRate: 0}, + ]); assert.commandWorked(st.s0.adminCommand({configureQueryAnalyzer: ns1, mode: "off"})); diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index cdcff7f24fe..11f52ae723c 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -75,6 +75,7 @@ #include "mongo/logv2/log.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/query_analysis_sample_counters.h" #include "mongo/s/query_analysis_sampler_util.h" #include "mongo/util/database_name_util.h" #include "mongo/util/namespace_string_util.h" @@ -699,7 +700,7 @@ void CommonMongodProcessInterface::_reportCurrentOpsForIdleSessions( void CommonMongodProcessInterface::_reportCurrentOpsForQueryAnalysis( OperationContext* opCtx, std::vector<BSONObj>* ops) const { if (analyze_shard_key::supportsPersistingSampledQueries()) { - analyze_shard_key::QueryAnalysisWriter::get(opCtx)->reportForCurrentOp(ops); + analyze_shard_key::QueryAnalysisSampleCounters::get(opCtx).reportForCurrentOp(ops); } } diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index c5f6790b6ac..b82d13ff4c6 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/redaction.h" +#include "mongo/s/analyze_shard_key_role.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -47,7 +48,7 @@ #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" -#include "mongo/s/query_analysis_sampler.h" +#include "mongo/s/query_analysis_sample_counters.h" #include "mongo/s/stale_shard_version_helpers.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" @@ -293,7 +294,7 @@ void MongosProcessInterface::_reportCurrentOpsForPrimaryOnlyServices( void MongosProcessInterface::_reportCurrentOpsForQueryAnalysis(OperationContext* opCtx, std::vector<BSONObj>* ops) const { if (analyze_shard_key::supportsSamplingQueries()) { - analyze_shard_key::QueryAnalysisSampler::get(opCtx).reportForCurrentOp(ops); + analyze_shard_key::QueryAnalysisSampleCounters::get(opCtx).reportForCurrentOp(ops); } } diff --git a/src/mongo/db/s/query_analysis_coordinator.cpp b/src/mongo/db/s/query_analysis_coordinator.cpp index 1e3cf284c0e..818b611ccaa 100644 --- a/src/mongo/db/s/query_analysis_coordinator.cpp +++ b/src/mongo/db/s/query_analysis_coordinator.cpp @@ -261,10 +261,6 @@ QueryAnalysisCoordinator::getNewConfigurationsForSampler(OperationContext* opCtx ? (1.0 / numActiveSamplers) : (weight / totalWeight); - if (sampleRateRatio == 0) { - return {}; - } - // Populate the query analyzer configurations for all collections. std::vector<CollectionQueryAnalyzerConfiguration> configurations; for (const auto& [_, configuration] : _configurations) { diff --git a/src/mongo/db/s/query_analysis_coordinator_test.cpp b/src/mongo/db/s/query_analysis_coordinator_test.cpp index 58cdad04509..76bd5a20c34 100644 --- a/src/mongo/db/s/query_analysis_coordinator_test.cpp +++ b/src/mongo/db/s/query_analysis_coordinator_test.cpp @@ -724,8 +724,8 @@ TEST_F(QueryAnalysisCoordinatorTest, GetNewConfigurationsMultipleSamplersBasic) // Query distribution after: [1.5, 0]. configurations1 = coordinator->getNewConfigurationsForSampler(operationContext(), mongosName1, 0); - // The weight for this mongos is 0 so no configurations should be returned. - ASSERT(configurations1.empty()); + assertContainsConfiguration( + configurations1, analyzerDoc0.getNs(), analyzerDoc0.getCollectionUuid(), 0.0); // Query distribution after: [0, 0]. configurations0 = diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp index d0b508ae366..91433f6ffea 100644 --- a/src/mongo/db/s/query_analysis_writer.cpp +++ b/src/mongo/db/s/query_analysis_writer.cpp @@ -540,8 +540,8 @@ ExecutorFuture<void> QueryAnalysisWriter::_addReadQuery(const UUID& sampleId, stdx::lock_guard<Latch> lk(_mutex); if (_queries.add(doc)) { - auto counters = _getOrCreateSampleCounters(nss, *collUuid); - counters->incrementReads(doc.objsize()); + QueryAnalysisSampleCounters::get(opCtx).incrementReads( + nss, *collUuid, doc.objsize()); } }) .then([this] { @@ -590,8 +590,8 @@ ExecutorFuture<void> QueryAnalysisWriter::addUpdateQuery( stdx::lock_guard<Latch> lk(_mutex); if (_queries.add(doc)) { - auto counters = _getOrCreateSampleCounters(sampledUpdateCmd.nss, *collUuid); - counters->incrementWrites(doc.objsize()); + QueryAnalysisSampleCounters::get(opCtx).incrementWrites( + sampledUpdateCmd.nss, *collUuid, doc.objsize()); } }) .then([this] { @@ -640,8 +640,8 @@ ExecutorFuture<void> QueryAnalysisWriter::addDeleteQuery( stdx::lock_guard<Latch> lk(_mutex); if (_queries.add(doc)) { - auto counters = _getOrCreateSampleCounters(sampledDeleteCmd.nss, *collUuid); - counters->incrementWrites(doc.objsize()); + QueryAnalysisSampleCounters::get(opCtx).incrementWrites( + sampledDeleteCmd.nss, *collUuid, doc.objsize()); } }) .then([this] { @@ -692,8 +692,8 @@ ExecutorFuture<void> QueryAnalysisWriter::addFindAndModifyQuery( stdx::lock_guard<Latch> lk(_mutex); if (_queries.add(doc)) { - auto counters = _getOrCreateSampleCounters(sampledFindAndModifyCmd.nss, *collUuid); - counters->incrementWrites(doc.objsize()); + QueryAnalysisSampleCounters::get(opCtx).incrementWrites( + sampledFindAndModifyCmd.nss, *collUuid, doc.objsize()); } }) .then([this] { @@ -755,26 +755,5 @@ ExecutorFuture<void> QueryAnalysisWriter::addDiff(const UUID& sampleId, }); } -void QueryAnalysisWriter::reportForCurrentOp(std::vector<BSONObj>* ops) const { - for (auto it = _sampleCountersMap.begin(); it != _sampleCountersMap.end(); ++it) { - ops->push_back(it->second->reportCurrentOp()); - } -} - -std::shared_ptr<SampleCounters> QueryAnalysisWriter::_getOrCreateSampleCounters( - const NamespaceString& nss, const UUID& collUuid) { - auto it = _sampleCountersMap.find(collUuid); - if (it == _sampleCountersMap.end()) { - it = _sampleCountersMap.emplace(collUuid, std::make_shared<SampleCounters>(nss, collUuid)) - .first; - } else { - if (nss != it->second->getNss()) { - // TODO SERVER-73990 Make sure collection renames are handled correctly, and test. - it->second->setNss(nss); - } - } - return it->second; -} - } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/db/s/query_analysis_writer.h b/src/mongo/db/s/query_analysis_writer.h index 0412da774fd..3ea38483b09 100644 --- a/src/mongo/db/s/query_analysis_writer.h +++ b/src/mongo/db/s/query_analysis_writer.h @@ -194,8 +194,6 @@ public: _flushDiffs(opCtx); } - void reportForCurrentOp(std::vector<BSONObj>* ops) const; - private: bool shouldRegisterReplicaSetAwareService() const override final; @@ -239,14 +237,6 @@ private: */ bool _exceedsMaxSizeBytes(); - /** - * Retrieve the collection's sample counters given the namespace string and the - * collection UUID. If the collection's sample counters are not found, a new set of - * counters is created for the collection and returned. - */ - std::shared_ptr<SampleCounters> _getOrCreateSampleCounters(const NamespaceString& nss, - const UUID& collUuid); - mutable Mutex _mutex = MONGO_MAKE_LATCH("QueryAnalysisWriter::_mutex"); PeriodicJobAnchor _periodicQueryWriter; @@ -255,8 +245,6 @@ private: PeriodicJobAnchor _periodicDiffWriter; Buffer _diffs{NamespaceString::kConfigSampledQueriesDiffNamespace}; - std::map<UUID, std::shared_ptr<SampleCounters>> _sampleCountersMap; - // Initialized on startup and joined on shutdown. std::shared_ptr<executor::TaskExecutor> _executor; }; diff --git a/src/mongo/db/s/query_analysis_writer_test.cpp b/src/mongo/db/s/query_analysis_writer_test.cpp index 8fb0e73a20d..2cc6ddf87db 100644 --- a/src/mongo/db/s/query_analysis_writer_test.cpp +++ b/src/mongo/db/s/query_analysis_writer_test.cpp @@ -411,16 +411,6 @@ protected: ASSERT_BSONOBJ_EQ(parsedCmd.toBSON({}), expectedCmd.toBSON({})); } - /* Asserts that there is a sampled write query document with the given sample id and - * the given size in bytes. - */ - int assertSampledQueryDocumentSize(const UUID& sampleId, long long size) { - auto doc = _getConfigDocument(NamespaceString::kConfigSampledQueriesNamespace, sampleId); - auto docSize = doc.objsize(); - ASSERT_EQ(docSize, size); - return docSize; - } - /* * Returns the number of the documents for the collection 'nss' in the config.sampledQueriesDiff * collection. @@ -1405,107 +1395,6 @@ TEST_F(QueryAnalysisWriterTest, DiffExceedsSizeLimit) { ASSERT_EQ(getDiffDocumentsCount(nss0), 0); } -TEST_F(QueryAnalysisWriterTest, ReportForCurrentOpForRead) { - auto& writer = *QueryAnalysisWriter::get(operationContext()); - - auto collUuid0 = getCollectionUUID(nss0); - auto sampleId = UUID::gen(); - - // Write a sampled query. - writer.addFindQuery(sampleId, nss0, makeNonEmptyFilter(), emptyCollation).get(); - writer.flushQueriesForTest(operationContext()); - - // Get currentOp report. - std::vector<BSONObj> reps; - writer.reportForCurrentOp(&reps); - ASSERT_EQ(reps.size(), 1); - ASSERT_EQ(reps[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(reps[0].getField(SampleCounters::kNamespaceStringFieldName).String(), - nss0.toString()); - ASSERT_EQ(UUID::parse(reps[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 1); - auto expectedReadsBytes = assertSampledQueryDocumentSize( - sampleId, reps[0].getField(SampleCounters::kSampledReadsBytesFieldName).Long()); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesBytesFieldName).Long(), 0); - - // Attempting to sample a query that is too large will not be counted. - sampleId = UUID::gen(); - BSONObjBuilder bobFilter = makeNonEmptyFilter(); - bobFilter.append(std::string(BSONObjMaxUserSize, 'a'), 1); - - writer.addFindQuery(sampleId, nss0, bobFilter.obj(), emptyCollation).get(); - writer.flushQueriesForTest(operationContext()); - - // Get currentOp report. - reps.clear(); - writer.reportForCurrentOp(&reps); - ASSERT_EQ(reps.size(), 1); - ASSERT_EQ(reps[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(reps[0].getField(SampleCounters::kNamespaceStringFieldName).String(), - nss0.toString()); - ASSERT_EQ(UUID::parse(reps[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 1); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsBytesFieldName).Long(), - expectedReadsBytes); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesBytesFieldName).Long(), 0); -} - -TEST_F(QueryAnalysisWriterTest, ReportForCurrentOpForWrite) { - auto& writer = *QueryAnalysisWriter::get(operationContext()); - - auto collUuid0 = getCollectionUUID(nss0); - - auto [originalCmd, expectedSampledCmds] = makeUpdateCommandRequest(nss0, 3, {0}); - ASSERT_EQ(expectedSampledCmds.size(), 1); - - // Write a sampled query. - writer.addUpdateQuery(originalCmd, 0).get(); - writer.flushQueriesForTest(operationContext()); - - // Get currentOp report. - std::vector<BSONObj> reps; - writer.reportForCurrentOp(&reps); - ASSERT_EQ(reps.size(), 1); - ASSERT_EQ(reps[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(reps[0].getField(SampleCounters::kNamespaceStringFieldName).String(), - nss0.toString()); - ASSERT_EQ(UUID::parse(reps[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsBytesFieldName).Long(), 0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 1); - auto expectedWritesBytes = assertSampledQueryDocumentSize( - expectedSampledCmds.begin()->first, - reps[0].getField(SampleCounters::kSampledWritesBytesFieldName).Long()); - - // Attempting to sample a query that is too large will not be counted. - auto [originalCmd2, expectedSampledCmds2] = - makeUpdateCommandRequest(nss0, 3, {0}, std::string(BSONObjMaxUserSize, 'a')); - ASSERT_EQ(expectedSampledCmds2.size(), 1); - - writer.addUpdateQuery(originalCmd2, 0).get(); - writer.flushQueriesForTest(operationContext()); - - // Get currentOp report. - reps.clear(); - writer.reportForCurrentOp(&reps); - ASSERT_EQ(reps.size(), 1); - ASSERT_EQ(reps[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(reps[0].getField(SampleCounters::kNamespaceStringFieldName).String(), - nss0.toString()); - ASSERT_EQ(UUID::parse(reps[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledReadsBytesFieldName).Long(), 0); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 1); - ASSERT_EQ(reps[0].getField(SampleCounters::kSampledWritesBytesFieldName).Long(), - expectedWritesBytes); -} - } // namespace } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/s/analyze_shard_key_common.idl b/src/mongo/s/analyze_shard_key_common.idl index 27307ae8e85..a41f48ba30c 100644 --- a/src/mongo/s/analyze_shard_key_common.idl +++ b/src/mongo/s/analyze_shard_key_common.idl @@ -81,3 +81,63 @@ structs: sampleRate: type: double description: "The maximum number of queries to sample per second." + + CollectionSampleCountersCurrentOp: + description: "The per-collection report on number and size of sampled queries for the currentOp + command." + strict: false + fields: + desc: + type: string + description: "The string identifier for the query sampling currentOp document." + default: '"query analyzer"' + ns: + type: namespacestring + description: "The namespace of the collection." + collUuid: + type: uuid + description: "The UUID of the collection." + optional: true + sampleRate: + type: double + description: "The maximum number of queries to sample per second. Only reported by mongos." + optional: true + sampledReadsCount: + type: long + description: "The number of read queries sampled." + sampledReadsBytes: + type: long + description: "The number of bytes stored for sampled read queries. Only reported by mongod." + optional: true + sampledWritesCount: + type: long + description: "The number of write queries sampled." + sampledWritesBytes: + type: long + description: "The number of bytes stored for sampled write queries. Only reported by mongod." + optional: true + QueryAnalysisServerStatus: + description: "The server-wide report on the number of sampled queries for the serverStatus + command." + strict: false + fields: + activeCollections: + type: long + description: "The number of collections that are actively being sampled." + totalCollections: + type: long + description: "The total number of collections that have been sampled." + totalSampledReadsCount: + type: long + description: "The total number of sampled read queries." + totalSampledWritesCount: + type: long + description: "The total number of sampled write queries." + totalSampledReadsBytes: + type: long + description: "The total number of bytes stored for sampled read queries." + optional: true + totalSampledWritesBytes: + type: long + description: "The total number of bytes stored for sampled write queries." + optional: true diff --git a/src/mongo/s/query_analysis_sample_counters.cpp b/src/mongo/s/query_analysis_sample_counters.cpp index 13f6528fa3e..199c90ed40e 100644 --- a/src/mongo/s/query_analysis_sample_counters.cpp +++ b/src/mongo/s/query_analysis_sample_counters.cpp @@ -29,34 +29,127 @@ #include "mongo/s/query_analysis_sample_counters.h" -#include "mongo/bson/bsonobj.h" +#include "mongo/s/analyze_shard_key_common_gen.h" #include "mongo/s/is_mongos.h" namespace mongo { namespace analyze_shard_key { +namespace { -const std::string SampleCounters::kDescriptionFieldName("desc"); -const std::string SampleCounters::kDescriptionFieldValue("query analyzer"); -const std::string SampleCounters::kNamespaceStringFieldName("ns"); -const std::string SampleCounters::kCollUuidFieldName("collUuid"); -const std::string SampleCounters::kSampledReadsCountFieldName("sampledReadsCount"); -const std::string SampleCounters::kSampledWritesCountFieldName("sampledWritesCount"); -const std::string SampleCounters::kSampledReadsBytesFieldName("sampledReadsBytes"); -const std::string SampleCounters::kSampledWritesBytesFieldName("sampledWritesBytes"); -const std::string SampleCounters::kSampleRateFieldName("sampleRate"); - -BSONObj SampleCounters::reportCurrentOp() const { - BSONObjBuilder bob; - bob.append(kDescriptionFieldName, kDescriptionFieldValue); - bob.append(kNamespaceStringFieldName, _nss.toString()); - _collUuid.appendToBuilder(&bob, kCollUuidFieldName); - bob.append(kSampledReadsCountFieldName, _sampledReadsCount); - bob.append(kSampledWritesCountFieldName, _sampledWritesCount); +const auto getQueryAnalysisSampleCounters = + ServiceContext::declareDecoration<QueryAnalysisSampleCounters>(); + +} // namespace + +QueryAnalysisSampleCounters& QueryAnalysisSampleCounters::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +QueryAnalysisSampleCounters& QueryAnalysisSampleCounters::get(ServiceContext* serviceContext) { + return getQueryAnalysisSampleCounters(serviceContext); +} + +void QueryAnalysisSampleCounters::refreshConfigurations( + const std::vector<CollectionQueryAnalyzerConfiguration>& configurations) { + stdx::lock_guard<Latch> lk(_mutex); + std::map<NamespaceString, + std::shared_ptr<QueryAnalysisSampleCounters::CollectionSampleCounters>> + newSampleCounters; + + for (const auto& configuration : configurations) { + auto it = _sampleCounters.find(configuration.getNs()); + if (it == _sampleCounters.end() || + it->second->getCollUuid() != configuration.getCollectionUuid()) { + newSampleCounters.emplace(std::make_pair( + configuration.getNs(), + std::make_shared<CollectionSampleCounters>(configuration.getNs(), + configuration.getCollectionUuid(), + configuration.getSampleRate()))); + } else { + it->second->setSampleRate(configuration.getSampleRate()); + newSampleCounters.emplace(std::make_pair(configuration.getNs(), it->second)); + } + _sampledNamespaces.insert(configuration.getNs()); + } + _sampleCounters = std::move(newSampleCounters); +} + +void QueryAnalysisSampleCounters::incrementReads(const NamespaceString& nss, + const boost::optional<UUID>& collUuid, + boost::optional<int64_t> size) { + stdx::lock_guard<Latch> lk(_mutex); + auto counters = _getOrCreateCollectionSampleCounters(lk, nss, collUuid); + counters->incrementReads(size); + ++_totalSampledReadsCount; + if (size) { + _totalSampledReadsBytes += *size; + } +} + +void QueryAnalysisSampleCounters::incrementWrites(const NamespaceString& nss, + const boost::optional<UUID>& collUuid, + boost::optional<int64_t> size) { + stdx::lock_guard<Latch> lk(_mutex); + auto counters = _getOrCreateCollectionSampleCounters(lk, nss, collUuid); + counters->incrementWrites(size); + ++_totalSampledWritesCount; + if (size) { + _totalSampledWritesBytes += *size; + } +} + +std::shared_ptr<QueryAnalysisSampleCounters::CollectionSampleCounters> +QueryAnalysisSampleCounters::_getOrCreateCollectionSampleCounters( + WithLock, const NamespaceString& nss, const boost::optional<UUID>& collUuid) { + auto it = _sampleCounters.find(nss); + if (it == _sampleCounters.end()) { + // Do not create a new set of counters without collUuid specified: + invariant(collUuid); + it = _sampleCounters + .emplace(std::make_pair( + nss, + std::make_shared<QueryAnalysisSampleCounters::CollectionSampleCounters>( + nss, *collUuid))) + .first; + _sampledNamespaces.insert(nss); + } + return it->second; +} + +void QueryAnalysisSampleCounters::reportForCurrentOp(std::vector<BSONObj>* ops) const { + stdx::lock_guard<Latch> lk(_mutex); + for (auto const& it : _sampleCounters) { + ops->push_back(it.second->reportForCurrentOp()); + } +} + +BSONObj QueryAnalysisSampleCounters::CollectionSampleCounters::reportForCurrentOp() const { + CollectionSampleCountersCurrentOp report; + report.setNs(_nss); + report.setCollUuid(_collUuid); + report.setSampledReadsCount(_sampledReadsCount); + report.setSampledWritesCount(_sampledWritesCount); + if (isMongos()) { + report.setSampleRate(_sampleRate); + } else { + report.setSampledReadsBytes(_sampledReadsBytes); + report.setSampledWritesBytes(_sampledWritesBytes); + } + + return report.toBSON(); +} + +BSONObj QueryAnalysisSampleCounters::reportForServerStatus() const { + QueryAnalysisServerStatus res; + res.setActiveCollections(static_cast<int64_t>(_sampleCounters.size())); + res.setTotalCollections(static_cast<int64_t>(_sampledNamespaces.size())); + res.setTotalSampledReadsCount(_totalSampledReadsCount); + res.setTotalSampledWritesCount(_totalSampledWritesCount); if (!isMongos()) { - bob.append(kSampledReadsBytesFieldName, _sampledReadsBytes); - bob.append(kSampledWritesBytesFieldName, _sampledWritesBytes); + res.setTotalSampledReadsBytes(_totalSampledReadsBytes); + res.setTotalSampledWritesBytes(_totalSampledWritesBytes); } - return bob.obj(); + return res.toBSON(); } } // namespace analyze_shard_key diff --git a/src/mongo/s/query_analysis_sample_counters.h b/src/mongo/s/query_analysis_sample_counters.h index a80f2bcece0..2cf8b68a098 100644 --- a/src/mongo/s/query_analysis_sample_counters.h +++ b/src/mongo/s/query_analysis_sample_counters.h @@ -30,6 +30,12 @@ #pragma once #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/mutex.h" +#include "mongo/s/analyze_shard_key_common_gen.h" +#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/uuid.h" #include <boost/optional.hpp> @@ -39,83 +45,136 @@ namespace mongo { namespace analyze_shard_key { -class SampleCounters { +/** + * Maintains read and write counters of queries being sampled for shard key analysis. This includes + * server-wide counters and per-collection counters for the collections that have query sampling + * enabled. Instances of this object on mongod will also count the number of bytes being + * written to the sample collection. + */ +class QueryAnalysisSampleCounters { public: - static const std::string kDescriptionFieldName; - static const std::string kDescriptionFieldValue; - static const std::string kNamespaceStringFieldName; - static const std::string kCollUuidFieldName; - static const std::string kSampledReadsCountFieldName; - static const std::string kSampledWritesCountFieldName; - static const std::string kSampledReadsBytesFieldName; - static const std::string kSampledWritesBytesFieldName; - static const std::string kSampleRateFieldName; - - SampleCounters(const NamespaceString& nss, const UUID& collUuid) - : _nss(nss), - _collUuid(collUuid), - _sampledReadsCount(0LL), - _sampledReadsBytes(0LL), - _sampledWritesCount(0LL), - _sampledWritesBytes(0LL){}; - - NamespaceString getNss() const { - return _nss; - } - - void setNss(const NamespaceString& nss) { - _nss = nss; - } - - UUID getCollUUID() const { - return _collUuid; - } - - size_t getSampledReadsCount() const { - return _sampledReadsCount; - } - - size_t getSampledReadsBytes() const { - return _sampledReadsBytes; - } - - size_t getSampledWritesCount() const { - return _sampledWritesCount; - } - - size_t getSampledWritesBytes() const { - return _sampledWritesBytes; - } + class CollectionSampleCounters { + public: + CollectionSampleCounters(const NamespaceString& nss, + const UUID& collUuid, + double sampleRate = 0.0) + : _nss(nss), _collUuid(collUuid), _sampleRate(sampleRate){}; + + NamespaceString getNs() const { + return _nss; + } + + UUID getCollUuid() const { + return _collUuid; + } + + double getSampleRate() const { + return _sampleRate; + } + + void setSampleRate(double sampleRate) { + _sampleRate = sampleRate; + } + + int64_t getSampledReadsCount() const { + return _sampledReadsCount; + } + + int64_t getSampledReadsBytes() const { + return _sampledReadsBytes; + } + + int64_t getSampledWritesCount() const { + return _sampledWritesCount; + } + + int64_t getSampledWritesBytes() const { + return _sampledWritesBytes; + } + + /** + * Increments the read counter and adds <size> to the read bytes counter. + */ + void incrementReads(boost::optional<int64_t> size = boost::none) { + ++_sampledReadsCount; + if (size) { + _sampledReadsBytes += *size; + } + } + + /** + * Increments the write counter and adds <size> to the write bytes counter. + */ + void incrementWrites(boost::optional<int64_t> size = boost::none) { + ++_sampledWritesCount; + if (size) { + _sampledWritesBytes += *size; + } + } + + BSONObj reportForCurrentOp() const; + + private: + NamespaceString _nss; + UUID _collUuid; + int64_t _sampledReadsCount = 0; + int64_t _sampledReadsBytes = 0; + int64_t _sampledWritesCount = 0; + int64_t _sampledWritesBytes = 0; + double _sampleRate = 0.0; + }; + + QueryAnalysisSampleCounters() {} /** - * Increments the read counter and adds <size> to the read bytes counter. + * Returns a reference to the service-wide QueryAnalysisSampleCounters instance. */ - void incrementReads(boost::optional<long long> size) { - ++_sampledReadsCount; - if (size) { - _sampledReadsBytes += *size; - } - } + static QueryAnalysisSampleCounters& get(OperationContext* opCtx); + static QueryAnalysisSampleCounters& get(ServiceContext* serviceContext); + + void refreshConfigurations( + const std::vector<CollectionQueryAnalyzerConfiguration>& configurations); /** - * Increments the write counter and adds <size> to the write bytes counter. + * Retrieves the collection's sample counters given the namespace string and the collection + * UUID. If the collection's sample counters do not exist, new counters are created for the + * collection and returned. */ - void incrementWrites(boost::optional<long long> size) { - ++_sampledWritesCount; - if (size) { - _sampledWritesBytes += *size; - } - } + void incrementReads(const NamespaceString& nss, + const boost::optional<UUID>& collUuid = boost::none, + boost::optional<int64_t> size = boost::none); + void incrementWrites(const NamespaceString& nss, + const boost::optional<UUID>& collUuid = boost::none, + boost::optional<int64_t> size = boost::none); - BSONObj reportCurrentOp() const; + /** + * Reports sample counters for each collection, inserting one BSONObj per collection. + */ + void reportForCurrentOp(std::vector<BSONObj>* ops) const; + + /** + * Reports number of queries sampled over the lifetime of the server. + */ + BSONObj reportForServerStatus() const; private: - NamespaceString _nss; - const UUID _collUuid; - long long _sampledReadsCount; - long long _sampledReadsBytes; - long long _sampledWritesCount; - long long _sampledWritesBytes; + std::shared_ptr<CollectionSampleCounters> _getOrCreateCollectionSampleCounters( + WithLock, const NamespaceString& nss, const boost::optional<UUID>& collUuid); + + mutable Mutex _mutex = MONGO_MAKE_LATCH("QueryAnalysisSampleCounters::_mutex"); + + int64_t _totalSampledReadsCount = 0; + int64_t _totalSampledWritesCount = 0; + int64_t _totalSampledReadsBytes = 0; + int64_t _totalSampledWritesBytes = 0; + + // Per-collection sample counters. When sampling for a collection is turned off, + // its counters will be removed from this map. + std::map<NamespaceString, std::shared_ptr<CollectionSampleCounters>> _sampleCounters; + + // Set of collections that have been sampled, for maintaining the total count of + // collections sampled, reported in server status. + std::set<NamespaceString> _sampledNamespaces; }; } // namespace analyze_shard_key diff --git a/src/mongo/s/query_analysis_sample_counters_test.cpp b/src/mongo/s/query_analysis_sample_counters_test.cpp index b16881071ad..e6650a15dcf 100644 --- a/src/mongo/s/query_analysis_sample_counters_test.cpp +++ b/src/mongo/s/query_analysis_sample_counters_test.cpp @@ -27,13 +27,21 @@ * it in the license file. */ +#include "mongo/s/query_analysis_sample_counters.h" + #include "mongo/db/namespace_string.h" +#include "mongo/db/service_context_test_fixture.h" #include "mongo/logv2/log.h" -#include "mongo/s/query_analysis_sample_counters.h" +#include "mongo/s/analyze_shard_key_common_gen.h" +#include "mongo/s/is_mongos.h" +#include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/bson_test_util.h" +#include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" #include <boost/none.hpp> +#include <boost/optional.hpp> #include <cstddef> #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest @@ -44,38 +52,308 @@ namespace { const NamespaceString nss0 = NamespaceString::createNamespaceString_forTest("testDb", "testColl0"); -TEST(QueryAnalysisSampleCounters, CountAndReportCurrentOp) { - auto collUuid = UUID::gen(); - auto counters = SampleCounters(nss0, collUuid); - ASSERT_EQ(nss0, counters.getNss()); - ASSERT_EQ(collUuid, counters.getCollUUID()); - ASSERT_EQ(0, counters.getSampledReadsCount()); - ASSERT_EQ(0, counters.getSampledReadsBytes()); - ASSERT_EQ(0, counters.getSampledWritesCount()); - ASSERT_EQ(0, counters.getSampledWritesBytes()); - - long long size = 100LL; - - counters.incrementReads(size); - counters.incrementReads(boost::none); - ASSERT_EQ(2, counters.getSampledReadsCount()); - ASSERT_EQ(size, counters.getSampledReadsBytes()); - - counters.incrementWrites(size); - counters.incrementWrites(boost::none); - ASSERT_EQ(2, counters.getSampledWritesCount()); - ASSERT_EQ(size, counters.getSampledWritesBytes()); - - BSONObj report = counters.reportCurrentOp(); - - ASSERT_EQ(report.getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(report.getField(SampleCounters::kNamespaceStringFieldName).String(), nss0.toString()); - ASSERT_EQ(UUID::parse(report.getField(SampleCounters::kCollUuidFieldName)), collUuid); - ASSERT_EQ(report.getField(SampleCounters::kSampledReadsCountFieldName).Long(), 2); - ASSERT_EQ(report.getField(SampleCounters::kSampledWritesCountFieldName).Long(), 2); - ASSERT_EQ(report.getField(SampleCounters::kSampledReadsBytesFieldName).Long(), size); - ASSERT_EQ(report.getField(SampleCounters::kSampledWritesBytesFieldName).Long(), size); +static const std::string kCurrentOpDescFieldValue{"query analyzer"}; + +class QueryAnalysisSampleCountersTest : public ServiceContextTest { +public: + /** + * Run through sample counters refreshConfiguration and increment functions depending on + * whether process is mongod or mongos. + */ + void testRefreshConfigIncrementAndReport(bool isMongos); + +protected: + const NamespaceString nss0 = + NamespaceString::createNamespaceString_forTest("testDb", "testColl0"); + const NamespaceString nss1 = + NamespaceString::createNamespaceString_forTest("testDb", "testColl1"); + const UUID collUuid0 = UUID::gen(); + const UUID collUuid1 = UUID::gen(); + +private: + bool _originalIsMongos; +}; + +void QueryAnalysisSampleCountersTest::testRefreshConfigIncrementAndReport(const bool testMongos) { + bool originalIsMongos = isMongos(); + setMongos(testMongos); + ON_BLOCK_EXIT([&] { setMongos(originalIsMongos); }); + + const bool testMongod = !testMongos; + + const double sampleRate0 = 0.0; + const double sampleRate1Before = 0.0000000001; + const double sampleRate1After = 222.2; + + // The mock size for each sampled read or write query, in bytes. + const int64_t sampledQueryDocSizeBytes = 10; + + auto svcCtx = getServiceContext(); + QueryAnalysisSampleCounters& sampleCounters = QueryAnalysisSampleCounters::get(svcCtx); + + // Add first configuration and refresh. + std::vector<CollectionQueryAnalyzerConfiguration> configurationsV1; + configurationsV1.emplace_back(nss0, collUuid0, sampleRate0); + sampleCounters.refreshConfigurations(configurationsV1); + + // Verify currentOp, one configuration. + std::vector<BSONObj> ops; + sampleCounters.reportForCurrentOp(&ops); + ASSERT_EQ(1, ops.size()); + auto parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[0]); + ASSERT_EQ(parsedOp.getDesc(), kCurrentOpDescFieldValue); + ASSERT_EQ(parsedOp.getNs(), nss0); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid0); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate0); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 0); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 0); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), 0L); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), 0L); + } + + // Verify server status, one configuration. + BSONObj serverStatus; + serverStatus = sampleCounters.reportForServerStatus(); + auto parsedServerStatus = QueryAnalysisServerStatus::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + serverStatus); + ASSERT_EQ(parsedServerStatus.getActiveCollections(), 1); + ASSERT_EQ(parsedServerStatus.getTotalCollections(), 1); + ASSERT_EQ(parsedServerStatus.getTotalSampledReadsCount(), 0); + ASSERT_EQ(parsedServerStatus.getTotalSampledWritesCount(), 0); + if (testMongod) { + ASSERT_EQ(*parsedServerStatus.getTotalSampledReadsBytes(), 0L); + ASSERT_EQ(*parsedServerStatus.getTotalSampledWritesBytes(), 0L); + } + + // Add second configuration and refresh. + std::vector<CollectionQueryAnalyzerConfiguration> configurationsV2; + configurationsV2.emplace_back(nss0, collUuid0, sampleRate0); + configurationsV2.emplace_back(nss1, collUuid1, sampleRate1Before); + sampleCounters.refreshConfigurations(configurationsV2); + + // Verify currentOp, two configurations. + ops.clear(); + sampleCounters.reportForCurrentOp(&ops); + ASSERT_EQ(2, ops.size()); + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[0]); + ASSERT_EQ(parsedOp.getDesc(), kCurrentOpDescFieldValue); + ASSERT_EQ(parsedOp.getNs(), nss0); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid0); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate0); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 0); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 0); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), 0L); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), 0L); + } + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[1]); + ASSERT_EQ(parsedOp.getNs(), nss1); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid1); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate1Before); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 0); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 0); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), 0L); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), 0L); + } + + // Verify server status, two configurations. + serverStatus = sampleCounters.reportForServerStatus(); + parsedServerStatus = QueryAnalysisServerStatus::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + serverStatus); + ASSERT_EQ(parsedServerStatus.getActiveCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalSampledReadsCount(), 0); + ASSERT_EQ(parsedServerStatus.getTotalSampledWritesCount(), 0); + if (testMongod) { + ASSERT_EQ(*parsedServerStatus.getTotalSampledReadsBytes(), 0L); + ASSERT_EQ(*parsedServerStatus.getTotalSampledWritesBytes(), 0L); + } + + // Modify second configuration and refresh. + std::vector<CollectionQueryAnalyzerConfiguration> configurationsV3; + configurationsV3.emplace_back(nss0, collUuid0, sampleRate0); + configurationsV3.emplace_back(nss1, collUuid1, sampleRate1After); + sampleCounters.refreshConfigurations(configurationsV3); + + // Increment read and write counters. + if (testMongos) { + sampleCounters.incrementReads(nss0, collUuid0); + sampleCounters.incrementWrites(nss0, collUuid0); + sampleCounters.incrementReads(nss1, collUuid1); + sampleCounters.incrementReads(nss1, collUuid1); + sampleCounters.incrementWrites(nss1, collUuid1); + } + if (testMongod) { + sampleCounters.incrementReads(nss0, collUuid0, sampledQueryDocSizeBytes); + sampleCounters.incrementWrites(nss0, collUuid0, sampledQueryDocSizeBytes); + sampleCounters.incrementReads(nss1, collUuid1, sampledQueryDocSizeBytes); + sampleCounters.incrementReads(nss1, collUuid1, sampledQueryDocSizeBytes); + sampleCounters.incrementWrites(nss1, collUuid1, sampledQueryDocSizeBytes); + } + + // Verify currentOp, two configurations, updated sample rate. + ops.clear(); + sampleCounters.reportForCurrentOp(&ops); + ASSERT_EQ(2, ops.size()); + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[0]); + ASSERT_EQ(parsedOp.getNs(), nss0); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid0); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate0); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 1); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 1); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), sampledQueryDocSizeBytes); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), sampledQueryDocSizeBytes); + } + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[1]); + ASSERT_EQ(parsedOp.getNs(), nss1); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid1); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate1After); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 2); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 1); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), 2 * sampledQueryDocSizeBytes); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), sampledQueryDocSizeBytes); + } + + // Verify server status, two configurations. + serverStatus = sampleCounters.reportForServerStatus(); + parsedServerStatus = QueryAnalysisServerStatus::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + serverStatus); + ASSERT_EQ(parsedServerStatus.getActiveCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalSampledReadsCount(), 3); + ASSERT_EQ(parsedServerStatus.getTotalSampledWritesCount(), 2); + if (testMongod) { + ASSERT_EQ(*parsedServerStatus.getTotalSampledReadsBytes(), 3 * sampledQueryDocSizeBytes); + ASSERT_EQ(*parsedServerStatus.getTotalSampledWritesBytes(), 2 * sampledQueryDocSizeBytes); + } + + // Second configuration becomes inactive. + std::vector<CollectionQueryAnalyzerConfiguration> configurationsV4; + configurationsV4.emplace_back(nss0, collUuid0, sampleRate0); + sampleCounters.refreshConfigurations(configurationsV4); + + // Verify currentOp, one remaining configuration. + ops.clear(); + sampleCounters.reportForCurrentOp(&ops); + ASSERT_EQ(1, ops.size()); + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[0]); + ASSERT_EQ(parsedOp.getNs(), nss0); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid0); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate0); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 1); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 1); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), sampledQueryDocSizeBytes); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), sampledQueryDocSizeBytes); + } + + // Verify server status, one remaining configuration. + serverStatus = sampleCounters.reportForServerStatus(); + parsedServerStatus = QueryAnalysisServerStatus::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + serverStatus); + ASSERT_EQ(parsedServerStatus.getActiveCollections(), 1); + ASSERT_EQ(parsedServerStatus.getTotalCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalSampledReadsCount(), 3); + ASSERT_EQ(parsedServerStatus.getTotalSampledWritesCount(), 2); + if (testMongod) { + ASSERT_EQ(*parsedServerStatus.getTotalSampledReadsBytes(), 3 * sampledQueryDocSizeBytes); + ASSERT_EQ(*parsedServerStatus.getTotalSampledWritesBytes(), 2 * sampledQueryDocSizeBytes); + } + + // Second configuration becomes active again + std::vector<CollectionQueryAnalyzerConfiguration> configurationsV5; + configurationsV5.emplace_back(nss0, collUuid0, sampleRate0); + configurationsV5.emplace_back(nss1, collUuid1, sampleRate1After); + sampleCounters.refreshConfigurations(configurationsV5); + + // Verify currentOp, two configurations, updated sample rate. + ops.clear(); + sampleCounters.reportForCurrentOp(&ops); + ASSERT_EQ(2, ops.size()); + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[0]); + ASSERT_EQ(parsedOp.getDesc(), kCurrentOpDescFieldValue); + ASSERT_EQ(parsedOp.getNs(), nss0); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid0); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate0); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 1); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 1); + if (testMongod) { + ASSERT_EQ(parsedOp.getSampledReadsBytes(), sampledQueryDocSizeBytes); + ASSERT_EQ(parsedOp.getSampledWritesBytes(), sampledQueryDocSizeBytes); + } + parsedOp = CollectionSampleCountersCurrentOp::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + ops[1]); + ASSERT_EQ(parsedOp.getDesc(), kCurrentOpDescFieldValue); + ASSERT_EQ(parsedOp.getNs(), nss1); + ASSERT_EQ(parsedOp.getCollUuid(), collUuid1); + if (testMongos) { + ASSERT_EQ(parsedOp.getSampleRate(), sampleRate1After); + } + ASSERT_EQ(parsedOp.getSampledReadsCount(), 0); + ASSERT_EQ(parsedOp.getSampledWritesCount(), 0); + if (testMongod) { + ASSERT_EQ(*(parsedOp.getSampledReadsBytes()), 0L); + ASSERT_EQ(*(parsedOp.getSampledWritesBytes()), 0L); + } + + // Verify server status, two configurations. + serverStatus = sampleCounters.reportForServerStatus(); + parsedServerStatus = QueryAnalysisServerStatus::parse( + IDLParserContext("QueryAnalysisSampleCountersTest.RefreshConfigIncrementAndReport_TEST"), + serverStatus); + ASSERT_EQ(parsedServerStatus.getActiveCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalCollections(), 2); + ASSERT_EQ(parsedServerStatus.getTotalSampledReadsCount(), 3); + ASSERT_EQ(parsedServerStatus.getTotalSampledWritesCount(), 2); + if (testMongod) { + ASSERT_EQ(*parsedServerStatus.getTotalSampledReadsBytes(), 3 * sampledQueryDocSizeBytes); + ASSERT_EQ(*parsedServerStatus.getTotalSampledWritesBytes(), 2 * sampledQueryDocSizeBytes); + } +} + +TEST_F(QueryAnalysisSampleCountersTest, RefreshConfigIncrementAndReportMongos) { + testRefreshConfigIncrementAndReport(true); +} + +TEST_F(QueryAnalysisSampleCountersTest, RefreshConfigIncrementAndReportMongod) { + testRefreshConfigIncrementAndReport(false); } } // namespace diff --git a/src/mongo/s/query_analysis_sampler.cpp b/src/mongo/s/query_analysis_sampler.cpp index 29d699f9f0d..5fb0bb5e316 100644 --- a/src/mongo/s/query_analysis_sampler.cpp +++ b/src/mongo/s/query_analysis_sampler.cpp @@ -35,6 +35,7 @@ #include "mongo/logv2/log.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" +#include "mongo/s/query_analysis_sample_counters.h" #include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h" #include "mongo/util/net/socket_utils.h" @@ -46,6 +47,7 @@ namespace analyze_shard_key { namespace { MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisSampler); +MONGO_FAIL_POINT_DEFINE(overwriteQueryAnalysisSamplerAvgLastCountToZero); const auto getQueryAnalysisSampler = ServiceContext::declareDecoration<QueryAnalysisSampler>(); @@ -150,6 +152,10 @@ double QueryAnalysisSampler::SampleRateLimiter::_getBurstCapacity(double numToke void QueryAnalysisSampler::SampleRateLimiter::_refill(double numTokensPerSecond, double burstCapacity) { + if (numTokensPerSecond == 0.0) { + return; + } + auto currTicks = _serviceContext->getTickSource()->getTicks(); double numSecondsElapsed = _serviceContext->getTickSource() ->ticksTo<Nanoseconds>(currTicks - _lastRefillTimeTicks) @@ -220,7 +226,9 @@ void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) { boost::optional<double> lastAvgCount; { stdx::lock_guard<Latch> lk(_mutex); - lastAvgCount = _queryStats.getLastAvgCount(); + lastAvgCount = (MONGO_unlikely(overwriteQueryAnalysisSamplerAvgLastCountToZero.shouldFail()) + ? 0 + : _queryStats.getLastAvgCount()); } if (!lastAvgCount) { @@ -267,6 +275,8 @@ void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) { "response"_attr = response); } + QueryAnalysisSampleCounters::get(opCtx).refreshConfigurations(response.getConfigurations()); + stdx::lock_guard<Latch> lk(_mutex); std::map<NamespaceString, SampleRateLimiter> sampleRateLimiters; @@ -295,33 +305,27 @@ void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) { _sampleRateLimiters = std::move(sampleRateLimiters); } -void QueryAnalysisSampler::SampleRateLimiter::incrementCounters( - const SampledCommandNameEnum cmdName) { +void QueryAnalysisSampler::_incrementCounters(OperationContext* opCtx, + const NamespaceString& nss, + const SampledCommandNameEnum cmdName) { switch (cmdName) { case SampledCommandNameEnum::kFind: case SampledCommandNameEnum::kAggregate: case SampledCommandNameEnum::kCount: case SampledCommandNameEnum::kDistinct: - _counters.incrementReads(boost::none); + QueryAnalysisSampleCounters::get(opCtx).incrementReads(nss); break; case SampledCommandNameEnum::kInsert: case SampledCommandNameEnum::kUpdate: case SampledCommandNameEnum::kDelete: case SampledCommandNameEnum::kFindAndModify: - _counters.incrementWrites(boost::none); + QueryAnalysisSampleCounters::get(opCtx).incrementWrites(nss); break; default: MONGO_UNREACHABLE; } } -BSONObj QueryAnalysisSampler::SampleRateLimiter::reportForCurrentOp() const { - BSONObjBuilder bob = _counters.reportCurrentOp(); - bob.append(SampleCounters::kSampleRateFieldName, _numTokensPerSecond); - BSONObj obj = bob.obj(); - return obj; -} - boost::optional<UUID> QueryAnalysisSampler::tryGenerateSampleId(OperationContext* opCtx, const NamespaceString& nss, SampledCommandNameEnum cmdName) { @@ -340,7 +344,7 @@ boost::optional<UUID> QueryAnalysisSampler::tryGenerateSampleId(OperationContext auto& rateLimiter = it->second; if (rateLimiter.tryConsume()) { - rateLimiter.incrementCounters(cmdName); + _incrementCounters(opCtx, nss, cmdName); return UUID::gen(); } return boost::none; @@ -351,11 +355,5 @@ void QueryAnalysisSampler::appendInfoForServerStatus(BSONObjBuilder* bob) const bob->append(kActiveCollectionsFieldName, static_cast<int>(_sampleRateLimiters.size())); } -void QueryAnalysisSampler::reportForCurrentOp(std::vector<BSONObj>* ops) const { - for (auto it = _sampleRateLimiters.begin(); it != _sampleRateLimiters.end(); ++it) { - ops->push_back(it->second.reportForCurrentOp()); - } -} - } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/s/query_analysis_sampler.h b/src/mongo/s/query_analysis_sampler.h index 08455fb2ef2..ea0be74f0e1 100644 --- a/src/mongo/s/query_analysis_sampler.h +++ b/src/mongo/s/query_analysis_sampler.h @@ -104,9 +104,8 @@ public: : _serviceContext(serviceContext), _nss(nss), _collUuid(collUuid), - _numTokensPerSecond(numTokensPerSecond), - _counters(nss, collUuid) { - invariant(_numTokensPerSecond > 0); + _numTokensPerSecond(numTokensPerSecond) { + invariant(_numTokensPerSecond >= 0); _lastRefillTimeTicks = _serviceContext->getTickSource()->getTicks(); }; @@ -144,10 +143,6 @@ public: */ void refreshRate(double numTokensPerSecond); - void incrementCounters(SampledCommandNameEnum cmdName); - - BSONObj reportForCurrentOp() const; - private: /** * Returns the maximum of number of tokens that a bucket with given rate can store at any @@ -169,8 +164,6 @@ public: // The bucket is only refilled when there is a consume request or a rate refresh. TickSource::Tick _lastRefillTimeTicks; double _lastNumTokens = 0; - - SampleCounters _counters; }; QueryAnalysisSampler() = default; @@ -202,8 +195,6 @@ public: void appendInfoForServerStatus(BSONObjBuilder* bob) const; - void reportForCurrentOp(std::vector<BSONObj>* ops) const; - void refreshQueryStatsForTest() { _refreshQueryStats(); } @@ -229,6 +220,10 @@ private: void _refreshConfigurations(OperationContext* opCtx); + void _incrementCounters(OperationContext* opCtx, + const NamespaceString& nss, + SampledCommandNameEnum cmdName); + mutable Mutex _mutex = MONGO_MAKE_LATCH("QueryAnalysisSampler::_mutex"); PeriodicJobAnchor _periodicQueryStatsRefresher; diff --git a/src/mongo/s/query_analysis_sampler_test.cpp b/src/mongo/s/query_analysis_sampler_test.cpp index 57225f6867a..97b432a8303 100644 --- a/src/mongo/s/query_analysis_sampler_test.cpp +++ b/src/mongo/s/query_analysis_sampler_test.cpp @@ -153,10 +153,6 @@ protected: const UUID collUuid = UUID::gen(); }; -DEATH_TEST_F(QueryAnalysisSamplerRateLimiterTest, CannotUseZeroRate, "invariant") { - QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0); -} - DEATH_TEST_F(QueryAnalysisSamplerRateLimiterTest, CannotUseNegativeRate, "invariant") { QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, -0.5); } @@ -1001,188 +997,6 @@ TEST_F(QueryAnalysisSamplerTest, RefreshConfigurationsNewCollectionUuid) { ASSERT_FALSE(sampler.tryGenerateSampleId(opCtx, nss0, SampledCommandNameEnum::kFind)); } -TEST_F(QueryAnalysisSamplerRateLimiterTest, ReportForCurrentOp) { - auto originalIsMongos = isMongos(); - setMongos(true); - ON_BLOCK_EXIT([&] { setMongos(originalIsMongos); }); - - const double rate = 1000.0; - auto rateLimiter = - QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, rate); - - long long expectedReadsCount = 0; - long long expectedWritesCount = 0; - - rateLimiter.incrementCounters(SampledCommandNameEnum::kFind); - ++expectedReadsCount; - - BSONObj report = rateLimiter.reportForCurrentOp(); - - ASSERT_EQ(report.getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(report.getField(SampleCounters::kNamespaceStringFieldName).String(), nss.toString()); - ASSERT_EQ(UUID::parse(report.getField(SampleCounters::kCollUuidFieldName)), collUuid); - ASSERT_EQ(report.getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(report.getField(SampleCounters::kSampledReadsCountFieldName).Long(), - expectedReadsCount); - ASSERT_EQ(report.getField(SampleCounters::kSampledWritesCountFieldName).Long(), - expectedWritesCount); - ASSERT(!report.hasField(SampleCounters::kSampledReadsBytesFieldName)); - ASSERT(!report.hasField(SampleCounters::kSampledWritesBytesFieldName)); - - rateLimiter.incrementCounters(SampledCommandNameEnum::kUpdate); - ++expectedWritesCount; - - report = rateLimiter.reportForCurrentOp(); - - ASSERT_EQ(report.getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(report.getField(SampleCounters::kNamespaceStringFieldName).String(), nss.toString()); - ASSERT_EQ(UUID::parse(report.getField(SampleCounters::kCollUuidFieldName)), collUuid); - ASSERT_EQ(report.getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(report.getField(SampleCounters::kSampledReadsCountFieldName).Long(), - expectedReadsCount); - ASSERT_EQ(report.getField(SampleCounters::kSampledWritesCountFieldName).Long(), - expectedWritesCount); - ASSERT(!report.hasField(SampleCounters::kSampledReadsBytesFieldName)); - ASSERT(!report.hasField(SampleCounters::kSampledWritesBytesFieldName)); -} - -TEST_F(QueryAnalysisSamplerTest, ReportForCurrentOp) { - auto originalIsMongos = isMongos(); - setMongos(true); - ON_BLOCK_EXIT([&] { setMongos(originalIsMongos); }); - - const double rate = 1000.0; - transport::TransportLayerMock transportLayer; - std::shared_ptr<transport::Session> session = transportLayer.createSession(); - auto client = - getGlobalServiceContext()->makeClient("RefreshConfigurationsNewCollectionUuid", session); - auto opCtxHolder = client->makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - auto& sampler = QueryAnalysisSampler::get(opCtx); - - std::vector<CollectionQueryAnalyzerConfiguration> configurations; - configurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, rate}); - setUpConfigurations(&sampler, configurations); - - advanceTime(Milliseconds(1000)); - - long long expectedReadsCount = 0; - long long expectedWritesCount = 0; - - // Sample a read query. - boost::optional<UUID> sampleUuid0 = - sampler.tryGenerateSampleId(opCtx, nss0, SampledCommandNameEnum::kFind); - ASSERT(sampleUuid0); - ++expectedReadsCount; - - std::vector<BSONObj> ops; - sampler.reportForCurrentOp(&ops); - - ASSERT_EQ(ops.size(), 1); - ASSERT_EQ(ops[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(ops[0].getField(SampleCounters::kNamespaceStringFieldName).String(), nss0.toString()); - ASSERT_EQ(UUID::parse(ops[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), - expectedReadsCount); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), - expectedWritesCount); - - // Sample a write query - auto sampleUuid1 = sampler.tryGenerateSampleId(opCtx, nss0, SampledCommandNameEnum::kUpdate); - ASSERT(sampleUuid1); - ++expectedWritesCount; - - ops.clear(); - sampler.reportForCurrentOp(&ops); - ASSERT_EQ(ops[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(ops[0].getField(SampleCounters::kNamespaceStringFieldName).String(), nss0.toString()); - ASSERT_EQ(UUID::parse(ops[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), - expectedReadsCount); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), - expectedWritesCount); -} - -TEST_F(QueryAnalysisSamplerTest, ReportForCurrentOpMultipleCollections) { - auto originalIsMongos = isMongos(); - setMongos(true); - ON_BLOCK_EXIT([&] { setMongos(originalIsMongos); }); - - const double rate = 1000.0; - transport::TransportLayerMock transportLayer; - std::shared_ptr<transport::Session> session = transportLayer.createSession(); - auto client = - getGlobalServiceContext()->makeClient("RefreshConfigurationsNewCollectionUuid", session); - auto opCtxHolder = client->makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - auto& sampler = QueryAnalysisSampler::get(opCtx); - - std::vector<CollectionQueryAnalyzerConfiguration> configOneColl, configTwoColls, - configOneCollNew; - configOneColl.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, rate}); - configTwoColls.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, rate}); - configTwoColls.push_back(CollectionQueryAnalyzerConfiguration{nss1, collUuid1, rate}); - configOneCollNew.push_back(CollectionQueryAnalyzerConfiguration{nss1, collUuid1, rate}); - - setUpConfigurations(&sampler, configOneColl); - - std::vector<BSONObj> ops; - sampler.reportForCurrentOp(&ops); - - ASSERT_EQ(ops.size(), 1); - ASSERT_EQ(ops[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(ops[0].getField(SampleCounters::kNamespaceStringFieldName).String(), nss0.toString()); - ASSERT_EQ(UUID::parse(ops[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 0); - - setUpConfigurations(&sampler, configTwoColls); - - ops.clear(); - sampler.reportForCurrentOp(&ops); - - ASSERT_EQ(ops.size(), 2); - ASSERT_EQ(ops[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(ops[0].getField(SampleCounters::kNamespaceStringFieldName).String(), nss0.toString()); - ASSERT_EQ(UUID::parse(ops[0].getField(SampleCounters::kCollUuidFieldName)), collUuid0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 0); - - ASSERT_EQ(ops[1].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(ops[1].getField(SampleCounters::kNamespaceStringFieldName).String(), nss1.toString()); - ASSERT_EQ(UUID::parse(ops[1].getField(SampleCounters::kCollUuidFieldName)), collUuid1); - ASSERT_EQ(ops[1].getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(ops[1].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 0); - ASSERT_EQ(ops[1].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 0); - - setUpConfigurations(&sampler, configOneCollNew); - - ops.clear(); - sampler.reportForCurrentOp(&ops); - - ASSERT_EQ(ops.size(), 1); - ASSERT_EQ(ops[0].getField(SampleCounters::kDescriptionFieldName).String(), - SampleCounters::kDescriptionFieldValue); - ASSERT_EQ(ops[0].getField(SampleCounters::kNamespaceStringFieldName).String(), nss1.toString()); - ASSERT_EQ(UUID::parse(ops[0].getField(SampleCounters::kCollUuidFieldName)), collUuid1); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampleRateFieldName).Double(), rate); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledReadsCountFieldName).Long(), 0); - ASSERT_EQ(ops[0].getField(SampleCounters::kSampledWritesCountFieldName).Long(), 0); -} - } // namespace } // namespace analyze_shard_key } // namespace mongo diff --git a/src/mongo/s/query_analysis_server_status.cpp b/src/mongo/s/query_analysis_server_status.cpp index f721b0badf9..42f80660f4f 100644 --- a/src/mongo/s/query_analysis_server_status.cpp +++ b/src/mongo/s/query_analysis_server_status.cpp @@ -47,13 +47,9 @@ public: BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override { - if (!supportsSamplingQueries()) { - return {}; - } - - BSONObjBuilder builder; - QueryAnalysisSampler::get(opCtx).appendInfoForServerStatus(&builder); - return builder.obj(); + return supportsSamplingQueries() + ? QueryAnalysisSampleCounters::get(opCtx).reportForServerStatus() + : BSONObj(); } } queryAnalysisServerStatus; |