summaryrefslogtreecommitdiff
path: root/src/mongo/s/query_analysis_sampler.h
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-10-11 18:44:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-11 19:21:37 +0000
commit526444c503085e776b3b63e356490a2912342261 (patch)
tree69a059f1af8de16bc1487341a804ada17b748833 /src/mongo/s/query_analysis_sampler.h
parent669ac94492976eff25d9cac17c43178a9b1a7ebd (diff)
downloadmongo-526444c503085e776b3b63e356490a2912342261.tar.gz
SERVER-70101 Implement token bucket rate limited sampling inside query analyzer
Diffstat (limited to 'src/mongo/s/query_analysis_sampler.h')
-rw-r--r--src/mongo/s/query_analysis_sampler.h89
1 files changed, 85 insertions, 4 deletions
diff --git a/src/mongo/s/query_analysis_sampler.h b/src/mongo/s/query_analysis_sampler.h
index 2b1f5ab49f9..728267f256a 100644
--- a/src/mongo/s/query_analysis_sampler.h
+++ b/src/mongo/s/query_analysis_sampler.h
@@ -32,7 +32,6 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/s/analyze_shard_key_server_parameters_gen.h"
-#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h"
#include "mongo/util/periodic_runner.h"
namespace mongo {
@@ -45,6 +44,8 @@ namespace analyze_shard_key {
* - The periodic background job that sends the calculated average to the coordinator to refresh the
* latest configurations. The average determines the share of the cluster-wide sample rate that
* will be assigned to this sampler.
+ * - The rate limiters that each controls the rate at which queries against a collection are sampled
+ * on this sampler.
*
* Currently, query sampling is only supported on a sharded cluster. So a sampler must be a mongos
* and the coordinator must be the config server's primary mongod.
@@ -84,6 +85,79 @@ public:
boost::optional<double> _lastAvgCount;
};
+ /**
+ * Controls the per-second rate at which queries against a collection are sampled on this
+ * sampler. Uses token bucket.
+ */
+ class SampleRateLimiter {
+ public:
+ static constexpr double kEpsilon = 0.001;
+
+ SampleRateLimiter(ServiceContext* serviceContext,
+ const NamespaceString& nss,
+ const UUID& collUuid,
+ double numTokensPerSecond)
+ : _serviceContext(serviceContext),
+ _nss(nss),
+ _collUuid(collUuid),
+ _numTokensPerSecond(numTokensPerSecond) {
+ invariant(_numTokensPerSecond > 0);
+ _lastRefillTime = _serviceContext->getFastClockSource()->now();
+ };
+
+ const NamespaceString& getNss() const {
+ return _nss;
+ }
+
+ const UUID& getCollectionUuid() const {
+ return _collUuid;
+ }
+
+ double getRate() const {
+ return _numTokensPerSecond;
+ }
+
+ double getBurstCapacity() const {
+ return _getBurstCapacity(_numTokensPerSecond);
+ }
+
+ /**
+ * Requests to consume one token from the bucket. Causes the bucket to be refilled with
+ * tokens created since last refill time. Does not block if the bucket is empty or there is
+ * fewer than one token in the bucket. Returns true if a token has been consumed
+ * successfully, and false otherwise.
+ */
+ bool tryConsume();
+
+ /**
+ * Sets a new rate. Causes the bucket to be refilled with tokens created since last refill
+ * time according to the previous rate.
+ */
+ void refreshRate(double numTokensPerSecond);
+
+ private:
+ /**
+ * Returns the maximum of number of tokens that a bucket with given rate can store at any
+ * given time.
+ */
+ static double _getBurstCapacity(double numTokensPerSecond);
+
+ /**
+ * Fills the bucket with tokens created since last refill time according to the given rate
+ * and burst capacity.
+ */
+ void _refill(double numTokensPerSecond, double burstCapacity);
+
+ const ServiceContext* _serviceContext;
+ const NamespaceString _nss;
+ const UUID _collUuid;
+ double _numTokensPerSecond;
+
+ // The bucket is only refilled when there is a consume request or a rate refresh.
+ Date_t _lastRefillTime;
+ double _lastNumTokens = 0;
+ };
+
QueryAnalysisSampler() = default;
~QueryAnalysisSampler() = default;
@@ -100,6 +174,13 @@ public:
void onShutdown();
+ /**
+ * Returns true if a query should be sampled, and false otherwise. Can only be invoked once on
+ * for each query since it decrements the number remaining queries to sample if this query
+ * should be sampled.
+ */
+ bool shouldSample(const NamespaceString& nss);
+
void refreshQueryStatsForTest() {
_refreshQueryStats();
}
@@ -113,9 +194,9 @@ public:
_refreshConfigurations(opCtx);
}
- std::vector<CollectionQueryAnalyzerConfiguration> getConfigurationsForTest() const {
+ std::map<NamespaceString, SampleRateLimiter> getRateLimitersForTest() const {
stdx::lock_guard<Latch> lk(_mutex);
- return _configurations;
+ return _sampleRateLimiters;
}
private:
@@ -129,7 +210,7 @@ private:
QueryStats _queryStats;
PeriodicJobAnchor _periodicConfigurationsRefresher;
- std::vector<CollectionQueryAnalyzerConfiguration> _configurations;
+ std::map<NamespaceString, SampleRateLimiter> _sampleRateLimiters;
};
} // namespace analyze_shard_key