diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-10-11 18:44:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-11 19:21:37 +0000 |
commit | 526444c503085e776b3b63e356490a2912342261 (patch) | |
tree | 69a059f1af8de16bc1487341a804ada17b748833 /src/mongo/s/query_analysis_sampler.h | |
parent | 669ac94492976eff25d9cac17c43178a9b1a7ebd (diff) | |
download | mongo-526444c503085e776b3b63e356490a2912342261.tar.gz |
SERVER-70101 Implement token bucket rate limited sampling inside query analyzer
Diffstat (limited to 'src/mongo/s/query_analysis_sampler.h')
-rw-r--r-- | src/mongo/s/query_analysis_sampler.h | 89 |
1 files changed, 85 insertions, 4 deletions
diff --git a/src/mongo/s/query_analysis_sampler.h b/src/mongo/s/query_analysis_sampler.h index 2b1f5ab49f9..728267f256a 100644 --- a/src/mongo/s/query_analysis_sampler.h +++ b/src/mongo/s/query_analysis_sampler.h @@ -32,7 +32,6 @@ #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/s/analyze_shard_key_server_parameters_gen.h" -#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h" #include "mongo/util/periodic_runner.h" namespace mongo { @@ -45,6 +44,8 @@ namespace analyze_shard_key { * - The periodic background job that sends the calculated average to the coordinator to refresh the * latest configurations. The average determines the share of the cluster-wide sample rate that * will be assigned to this sampler. + * - The rate limiters that each controls the rate at which queries against a collection are sampled + * on this sampler. * * Currently, query sampling is only supported on a sharded cluster. So a sampler must be a mongos * and the coordinator must be the config server's primary mongod. @@ -84,6 +85,79 @@ public: boost::optional<double> _lastAvgCount; }; + /** + * Controls the per-second rate at which queries against a collection are sampled on this + * sampler. Uses token bucket. + */ + class SampleRateLimiter { + public: + static constexpr double kEpsilon = 0.001; + + SampleRateLimiter(ServiceContext* serviceContext, + const NamespaceString& nss, + const UUID& collUuid, + double numTokensPerSecond) + : _serviceContext(serviceContext), + _nss(nss), + _collUuid(collUuid), + _numTokensPerSecond(numTokensPerSecond) { + invariant(_numTokensPerSecond > 0); + _lastRefillTime = _serviceContext->getFastClockSource()->now(); + }; + + const NamespaceString& getNss() const { + return _nss; + } + + const UUID& getCollectionUuid() const { + return _collUuid; + } + + double getRate() const { + return _numTokensPerSecond; + } + + double getBurstCapacity() const { + return _getBurstCapacity(_numTokensPerSecond); + } + + /** + * Requests to consume one token from the bucket. Causes the bucket to be refilled with + * tokens created since last refill time. Does not block if the bucket is empty or there is + * fewer than one token in the bucket. Returns true if a token has been consumed + * successfully, and false otherwise. + */ + bool tryConsume(); + + /** + * Sets a new rate. Causes the bucket to be refilled with tokens created since last refill + * time according to the previous rate. + */ + void refreshRate(double numTokensPerSecond); + + private: + /** + * Returns the maximum of number of tokens that a bucket with given rate can store at any + * given time. + */ + static double _getBurstCapacity(double numTokensPerSecond); + + /** + * Fills the bucket with tokens created since last refill time according to the given rate + * and burst capacity. + */ + void _refill(double numTokensPerSecond, double burstCapacity); + + const ServiceContext* _serviceContext; + const NamespaceString _nss; + const UUID _collUuid; + double _numTokensPerSecond; + + // The bucket is only refilled when there is a consume request or a rate refresh. + Date_t _lastRefillTime; + double _lastNumTokens = 0; + }; + QueryAnalysisSampler() = default; ~QueryAnalysisSampler() = default; @@ -100,6 +174,13 @@ public: void onShutdown(); + /** + * Returns true if a query should be sampled, and false otherwise. Can only be invoked once on + * for each query since it decrements the number remaining queries to sample if this query + * should be sampled. + */ + bool shouldSample(const NamespaceString& nss); + void refreshQueryStatsForTest() { _refreshQueryStats(); } @@ -113,9 +194,9 @@ public: _refreshConfigurations(opCtx); } - std::vector<CollectionQueryAnalyzerConfiguration> getConfigurationsForTest() const { + std::map<NamespaceString, SampleRateLimiter> getRateLimitersForTest() const { stdx::lock_guard<Latch> lk(_mutex); - return _configurations; + return _sampleRateLimiters; } private: @@ -129,7 +210,7 @@ private: QueryStats _queryStats; PeriodicJobAnchor _periodicConfigurationsRefresher; - std::vector<CollectionQueryAnalyzerConfiguration> _configurations; + std::map<NamespaceString, SampleRateLimiter> _sampleRateLimiters; }; } // namespace analyze_shard_key |