diff options
-rw-r--r-- | jstests/noPassthrough/telemetry_sharded_on_mongod.js | 166 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregate_command.idl | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/query/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/query/find_command.idl | 6 | ||||
-rw-r--r-- | src/mongo/db/query/telemetry.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/query/telemetry.h | 22 | ||||
-rw-r--r-- | src/mongo/db/query/telemetry.idl | 49 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 4 |
10 files changed, 341 insertions, 6 deletions
diff --git a/jstests/noPassthrough/telemetry_sharded_on_mongod.js b/jstests/noPassthrough/telemetry_sharded_on_mongod.js new file mode 100644 index 00000000000..7c8c164c7b5 --- /dev/null +++ b/jstests/noPassthrough/telemetry_sharded_on_mongod.js @@ -0,0 +1,166 @@ +// Tests that queries from mongos to mongod record telemetry correctly on mongod. +// Does not test any mongos logic on the telemetry read path. +// @tags: [requires_sharding, requires_fcv_63] +// +load('jstests/libs/analyze_plan.js'); +load("jstests/libs/feature_flag_util.js"); + +(function() { +"use strict"; + +let options = { + setParameter: {internalQueryConfigureTelemetrySamplingRate: 2147483647}, +}; + +const conn = MongoRunner.runMongod(options); +let db = conn.getDB(jsTestName()); +if (!FeatureFlagUtil.isEnabled(db, "Telemetry")) { + jsTestLog("Skipping test as featureFlagTelemetry is not enabled"); + return; +} + +let coll = db[jsTestName()]; +coll.drop(); + +// Insert documents. +let documents = []; +for (let i = 0; i < 10; i++) { + documents.push({_id: i, val: i * 5}); +} + +coll.insert(documents); + +// Run a query that pretends to be from mongos. For the purposes of this test we don't care about +// the results. If the command worked, it should show up in the telemetry store. + +const queryHashForTestBinA = new BinData(5, "Tfo0EBc0wbGscAbGuj7goA=="); +assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$match: {_id: 5}}, {$project: {val: false}}], + fromMongos: true, + needsMerge: true, + hashedTelemetryKey: {queryHash: queryHashForTestBinA, hostAndPort: "localHost:1"}, + cursor: {}, +})); + +const queryHashForTestBinB = new BinData(5, "d4Tk/HbrrGSpWrib6hY+IQ=="); +// Do the same, but a find command. +assert.commandWorked(db.runCommand({ + find: coll.getName(), + filter: {_id: 5}, + sort: {_id: 1}, + projection: {val: false}, + hashedTelemetryKey: {queryHash: queryHashForTestBinB, hostAndPort: "localHost:1"}, +})); + +// Check the telemetry store. +const telemetryResult = assert.commandWorked( + db.adminCommand({aggregate: 1, pipeline: [{$telemetry: {}}, {$sort: {"key": 1}}], cursor: {}})); +// Assert that the telemetry object has the "hashed key" as a key as expected and not the query +// shape for both commands. +let telemetryResultBatch = telemetryResult.cursor.firstBatch; +assert(telemetryResultBatch[0].key.hasOwnProperty("queryHash"), tojson(telemetryResultBatch[0])); +assert.eq( + telemetryResultBatch[0].key.queryHash, queryHashForTestBinA, tojson(telemetryResultBatch[0])); +assert(telemetryResultBatch[1].key.hasOwnProperty("queryHash"), tojson(telemetryResultBatch[1])); +assert.eq( + telemetryResultBatch[1].key.queryHash, queryHashForTestBinB, tojson(telemetryResultBatch[1])); + +MongoRunner.stopMongod(conn); + +// Spin up a sharded cluster to test end to end. +const st = new ShardingTest({ + mongos: 1, + shards: 1, + config: 1, + rs: {nodes: 1}, + rsOptions: options, + shardOptions: options, + mongosOptions: options, +}); + +const mongos = st.s; +db = mongos.getDB(jsTestName()); +assert(FeatureFlagUtil.isEnabled(db, "Telemetry"), "featureFlagTelemetry not enabled"); +coll = db[jsTestName()]; +coll.insert(documents); + +// Helper function to run the test regardless of the sharding state of the collection. +function runTestOnDbAndColl(funcDb, funcColl, funcShardTest) { + // Run an aggregation command. + assert.commandWorked(funcDb.runCommand({ + aggregate: funcColl.getName(), + pipeline: [{$match: {_id: 5}}, {$project: {val: false}}], + cursor: {}, + })); + + // Run a find command. + assert.commandWorked(funcDb.runCommand({ + find: funcColl.getName(), + filter: {_id: 5}, + sort: {_id: 1}, + projection: {val: false}, + })); + + const mongod = funcShardTest.getPrimaryShard(funcDb.getName()); + const shardDB = mongod.getDB(jsTestName()); + // Check that these commands generated something in the shard telemetry store. + const shardedTelemetryResult = assert.commandWorked(shardDB.adminCommand({ + aggregate: 1, + pipeline: [ + {$telemetry: {}}, + {$match: {"key.queryHash": {"$exists": true}}}, + {$sort: {"key.queryHash": 1}} + ], + cursor: {} + })); + + let telemetryResultBatch = shardedTelemetryResult.cursor.firstBatch; + // We can't check the value of hostandport as we may be assigned a different value each run. The + // hash should be deterministic. + // Note that this test is for checking mongod, not mongos. We are not checking whether mongos + // correctly aggregates results, just that mongod has a hashed key. + assert(telemetryResultBatch[0].key.hasOwnProperty("queryHash"), + tojson(telemetryResultBatch[0])); + assert.eq(telemetryResultBatch[0].key.queryHash, + queryHashForTestBinA, + tojson(telemetryResultBatch[1])); + assert(telemetryResultBatch[1].key.hasOwnProperty("queryHash"), + tojson(telemetryResultBatch[0])); + assert.eq(telemetryResultBatch[1].key.queryHash, + queryHashForTestBinB, + tojson(telemetryResultBatch[1])); +} +runTestOnDbAndColl(db, coll, st); + +// Restart the cluster to clear the telemetry store. +st.stop(); + +// We're going to shard the collection, so have two shards this time. +const shardedST = new ShardingTest({ + mongos: 1, + shards: 2, + config: 1, + rs: {nodes: 1}, + rsOptions: options, + mongosOptions: options, +}); + +const shardedMongos = shardedST.s; +db = shardedMongos.getDB(jsTestName()); +assert(FeatureFlagUtil.isEnabled(db, "Telemetry"), "featureFlagTelemetry not enabled"); +assert.commandWorked(db.adminCommand({enableSharding: db.getName()})); +coll = db[jsTestName()]; +coll.insert(documents); +assert.commandWorked( + shardedMongos.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}})); +// Ensure docs on each shard. +assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 5}})); +assert.commandWorked(shardedMongos.adminCommand( + {moveChunk: coll.getFullName(), find: {_id: 3}, to: shardedST.shard0.shardName})); +assert.commandWorked(shardedMongos.adminCommand( + {moveChunk: coll.getFullName(), find: {_id: 7}, to: shardedST.shard1.shardName})); + +runTestOnDbAndColl(db, coll, shardedST); +shardedST.stop(); +}()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 9203a19806c..c4361f13557 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -61,13 +61,11 @@ env.Library( ) env.Library( - target='aggregation_request_helper', - source=[ + target='aggregation_request_helper', source=[ 'aggregation_request_helper.cpp', 'aggregate_command.idl', 'external_data_source_option.idl', - ], - LIBDEPS=[ + ], LIBDEPS=[ '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/query/common_query_enums_and_helpers', @@ -77,8 +75,9 @@ env.Library( '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/db/write_concern_options', 'document_sources_idl', - ], -) + ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/query/telemetry_idl', + ]) env.Library( target='variable_validation', @@ -178,6 +177,7 @@ env.Library( 'sharded_agg_helpers.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/query/op_metrics', '$BUILD_DIR/mongo/s/async_requests_sender', '$BUILD_DIR/mongo/s/query/cluster_query', 'aggregation', diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index 14f13720102..8cb7e6c6cef 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -43,6 +43,7 @@ imports: - "mongo/db/pipeline/legacy_runtime_constants.idl" - "mongo/db/query/hint.idl" - "mongo/db/query/cursor_response.idl" + - "mongo/db/query/telemetry.idl" - "mongo/db/write_concern_options.idl" types: @@ -309,3 +310,9 @@ commands: type: optionalBool cpp_name: isClusterQueryWithoutShardKeyCmd stability: internal + hashedTelemetryKey: + description: "For sharded queries the telemetry key is calculated on mongos instead of from the full query shape locally" + type: ShardedTelemetryStoreKey + optional: true + stability: internal + diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index b4f16cb7dde..dfa0e5452b3 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -51,6 +51,7 @@ #include "mongo/db/pipeline/search_helper.h" #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/db/query/cursor_response_gen.h" +#include "mongo/db/query/telemetry.h" #include "mongo/db/vector_clock.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -65,6 +66,7 @@ #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" +#include "mongo/util/net/socket_utils.h" #include "mongo/util/overloaded_visitor.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -738,6 +740,11 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { } } +void setTelemetryKeyOnAggRequest(AggregateCommandRequest& request, ExpressionContext* expCtx) { + request.setHashedTelemetryKey(telemetry::telemetryKeyToShardedStoreId( + telemetry::getTelemetryKeyFromOpCtx(expCtx->opCtx), getHostNameCachedAndPort())); +} + } // namespace std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( @@ -778,6 +785,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( LiteParsedPipeline liteParsedPipeline(aggRequest); auto hasChangeStream = liteParsedPipeline.hasChangeStream(); auto startsWithDocuments = liteParsedPipeline.startsWithDocuments(); + setTelemetryKeyOnAggRequest(aggRequest, expCtx.get()); auto shardDispatchResults = dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, @@ -947,6 +955,9 @@ BSONObj createPassthroughCommandForShard( } } + telemetry::appendShardedTelemetryKeyIfApplicable( + targetedCmd, getHostNameCachedAndPort(), expCtx->opCtx); + auto shardCommand = genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, @@ -1482,6 +1493,7 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) { }(); AggregateCommandRequest aggRequest(expCtx->ns, rawStages); + setTelemetryKeyOnAggRequest(aggRequest, expCtx.get()); LiteParsedPipeline liteParsedPipeline(aggRequest); auto hasChangeStream = liteParsedPipeline.hasChangeStream(); auto startsWithDocuments = liteParsedPipeline.startsWithDocuments(); diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index a2c337ce8bd..1eee27673cf 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -250,6 +250,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/server_base', 'cursor_response_idl', + 'telemetry_idl', ], ) @@ -389,6 +390,7 @@ env.Library( 'memory_util', 'query_knobs', 'rate_limiting', + 'telemetry_idl', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/auth/auth', @@ -396,10 +398,20 @@ env.Library( '$BUILD_DIR/mongo/db/prepare_conflict_tracker', '$BUILD_DIR/mongo/db/stats/resource_consumption_metrics', '$BUILD_DIR/mongo/util/concurrency/admission_context', + '$BUILD_DIR/mongo/util/md5', '$BUILD_DIR/mongo/util/namespace_string_database_name_util', ], ) +env.Library( + target='telemetry_idl', + source=['telemetry.idl'], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + "$BUILD_DIR/mongo/idl/idl_parser", + ], +) + env.CppUnitTest( target="db_query_test", source=[ diff --git a/src/mongo/db/query/find_command.idl b/src/mongo/db/query/find_command.idl index c7a157dc810..b43ef171c4d 100644 --- a/src/mongo/db/query/find_command.idl +++ b/src/mongo/db/query/find_command.idl @@ -44,6 +44,7 @@ imports: - "mongo/db/basic_types.idl" - "mongo/db/query/cursor_response.idl" - "mongo/db/query/hint.idl" + - "mongo/db/query/telemetry.idl" types: boolNoOpSerializer: @@ -248,4 +249,9 @@ commands: type: uuid optional: true stability: unstable + hashedTelemetryKey: + description: "For sharded queries the telemetry key is calculated on mongos instead of from the full query shape locally" + type: ShardedTelemetryStoreKey + optional: true + stability: internal diff --git a/src/mongo/db/query/telemetry.cpp b/src/mongo/db/query/telemetry.cpp index 78d7511cc6d..13471a49c9e 100644 --- a/src/mongo/db/query/telemetry.cpp +++ b/src/mongo/db/query/telemetry.cpp @@ -44,7 +44,9 @@ #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/util/assert_util.h" +#include "mongo/util/md5.hpp" #include "mongo/util/system_clock_source.h" +#include <array> #include <optional> #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -53,10 +55,50 @@ namespace mongo { namespace telemetry { +// This is defined by IDL in the find and aggregate command, but we don't want to pull in those +// files/libraries here. Instead define here as well. +static const std::string kTelemetryKeyInShardedCommand = "hashedTelemetryKey"; + bool isTelemetryEnabled() { return feature_flags::gFeatureFlagTelemetry.isEnabledAndIgnoreFCV(); } +ShardedTelemetryStoreKey telemetryKeyToShardedStoreId(const BSONObj& key, std::string hostAndPort) { + md5digest finishedMD5; + std::array<unsigned char, 16> md5Bin; + md5(key.objdata(), 16, finishedMD5); + std::copy(std::begin(finishedMD5), std::end(finishedMD5), std::begin(md5Bin)); + return ShardedTelemetryStoreKey(hostAndPort, md5Bin); +} + +BSONObj getTelemetryKeyFromOpCtx(OperationContext* opCtx) { + return CurOp::get(opCtx)->debug().telemetryStoreKey; +} + +void appendShardedTelemetryKeyIfApplicable(MutableDocument& objToModify, + std::string hostAndPort, + OperationContext* opCtx) { + auto telemetryKey = getTelemetryKeyFromOpCtx(opCtx); + if (telemetryKey.isEmpty()) { + return; + } + objToModify.addField( + kTelemetryKeyInShardedCommand, + Value(telemetryKeyToShardedStoreId(getTelemetryKeyFromOpCtx(opCtx), hostAndPort).toBSON())); +} + +void appendShardedTelemetryKeyIfApplicable(BSONObjBuilder& objToModify, + std::string hostAndPort, + OperationContext* opCtx) { + auto telemetryKey = getTelemetryKeyFromOpCtx(opCtx); + if (telemetryKey.isEmpty()) { + return; + } + objToModify.append(kTelemetryKeyInShardedCommand, + telemetryKeyToShardedStoreId(telemetryKey, hostAndPort).toBSON()); +} + + namespace { CounterMetric telemetryEvictedMetric("telemetry.numEvicted"); @@ -395,7 +437,14 @@ void registerAggRequest(const AggregateCommandRequest& request, OperationContext if (!shouldCollect(opCtx->getServiceContext())) { return; } + if (auto hashKey = request.getHashedTelemetryKey()) { + // The key is in the command request in "telemetryKey". + CurOp::get(opCtx)->debug().telemetryStoreKey = hashKey->toBSON(); + CurOp::get(opCtx)->debug().shouldRecordTelemetry = true; + return; + } + // On standalone build the key from the request. BSONObjBuilder telemetryKey; BSONObjBuilder pipelineBuilder = telemetryKey.subarrayStart("pipeline"_sd); try { @@ -436,6 +485,13 @@ void registerFindRequest(const FindCommandRequest& request, return; } + if (auto hashKey = request.getHashedTelemetryKey()) { + // The key is in the command request in "hashedTelemetryKey". + CurOp::get(opCtx)->debug().telemetryStoreKey = hashKey->toBSON(); + CurOp::get(opCtx)->debug().shouldRecordTelemetry = true; + return; + } + BSONObjBuilder telemetryKey; try { // Serialize the request. @@ -505,6 +561,7 @@ void collectTelemetry(OperationContext* opCtx, const OpDebug& opDebug) { if (!opDebug.shouldRecordTelemetry) { return; } + auto&& metrics = LockedMetrics::get(opCtx, opDebug.telemetryStoreKey); metrics->docsReturned.aggregate(opDebug.nreturned); metrics->docsScanned.aggregate(opDebug.additiveMetrics.docsExamined.value_or(0)); diff --git a/src/mongo/db/query/telemetry.h b/src/mongo/db/query/telemetry.h index 91aceac1806..3f29f9ecc77 100644 --- a/src/mongo/db/query/telemetry.h +++ b/src/mongo/db/query/telemetry.h @@ -34,6 +34,7 @@ #include "mongo/db/curop.h" #include "mongo/db/query/partitioned_cache.h" #include "mongo/db/query/plan_explainer.h" +#include "mongo/db/query/telemetry_gen.h" #include "mongo/db/query/util/memory_util.h" #include "mongo/db/service_context.h" #include <cstdint> @@ -53,6 +54,27 @@ using BSONNumeric = long long; namespace telemetry { /** + * Generate a Telemetry Store key to be used on a shard from a Telemetry Store key that is being + * used on mongos. + */ +ShardedTelemetryStoreKey telemetryKeyToShardedStoreId(const BSONObj& key, std::string hostAndPort); +/** + * Get the telemetry query shape from the opCtx. + */ +BSONObj getTelemetryKeyFromOpCtx(OperationContext* opCtx); + +/** + * Given an object builder these functions append the telemetry key to it in the form + * for sharded commands {hostAndPort and hashedKey}. If there is no telemetry key available + * on the opCtx, does not modify the object. + */ +void appendShardedTelemetryKeyIfApplicable(MutableDocument& objToModify, + std::string hostAndPort, + OperationContext* opCtx); +void appendShardedTelemetryKeyIfApplicable(BSONObjBuilder& objToModify, + std::string hostAndPort, + OperationContext* opCtx); +/** * An aggregated metric stores a compressed view of data. It balances the loss of information with * the reduction in required storage. */ diff --git a/src/mongo/db/query/telemetry.idl b/src/mongo/db/query/telemetry.idl new file mode 100644 index 00000000000..fd233c98a2a --- /dev/null +++ b/src/mongo/db/query/telemetry.idl @@ -0,0 +1,49 @@ +# Copyright(C) 2023 - present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This IDL file describes the format of options needed for any command that can enable telemetry. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/db/basic_types.idl" + +structs: + ShardedTelemetryStoreKey: + description: "The hash of a query shape and the host:port of the originating mongos" + fields: + hostAndPort: + type: string + stability: internal + optional: false + queryHash: + type: bindata_md5 + stability: internal + optional: false +
\ No newline at end of file diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 2709b80f648..c6bf913a07d 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -47,6 +47,7 @@ #include "mongo/db/commands.h" #include "mongo/db/curop.h" #include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/fle_crud.h" #include "mongo/db/pipeline/change_stream_invalidation_info.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/canonical_query_encoder.h" @@ -71,6 +72,7 @@ #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" +#include "mongo/util/net/socket_utils.h" #include "mongo/util/scopeguard.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -208,6 +210,8 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( analyze_shard_key::appendSampleId(&cmdBuilder, *sampleId); } + telemetry::appendShardedTelemetryKeyIfApplicable( + cmdBuilder, getHostNameCachedAndPort(), opCtx); requests.emplace_back(shardId, cmdBuilder.obj()); } |