diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-10-11 18:44:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-11 19:21:37 +0000 |
commit | 526444c503085e776b3b63e356490a2912342261 (patch) | |
tree | 69a059f1af8de16bc1487341a804ada17b748833 /src | |
parent | 669ac94492976eff25d9cac17c43178a9b1a7ebd (diff) | |
download | mongo-526444c503085e776b3b63e356490a2912342261.tar.gz |
SERVER-70101 Implement token bucket rate limited sampling inside query analyzer
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/drop_database_coordinator.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/analyze_shard_key_server_parameters.idl | 10 | ||||
-rw-r--r-- | src/mongo/s/query_analysis_sampler.cpp | 75 | ||||
-rw-r--r-- | src/mongo/s/query_analysis_sampler.h | 89 | ||||
-rw-r--r-- | src/mongo/s/query_analysis_sampler_test.cpp | 474 |
7 files changed, 635 insertions, 25 deletions
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 449d800368a..23a095ebe4e 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -182,8 +182,6 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl( "namespace"_attr = nss(), "sharded"_attr = collIsSharded); - sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss(), boost::none); - if (collIsSharded) { invariant(_doc.getCollInfo()); const auto& coll = _doc.getCollInfo().value(); @@ -217,6 +215,8 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl( sharding_ddl_util::sendDropCollectionParticipantCommandToShards( opCtx, nss(), {primaryShardId}, **executor, getCurrentSession()); + sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss(), boost::none); + ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns()); LOGV2(5390503, "Collection dropped", "namespace"_attr = nss()); })) diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp index a492cee54de..fc7fd244719 100644 --- a/src/mongo/db/s/drop_database_coordinator.cpp +++ b/src/mongo/db/s/drop_database_coordinator.cpp @@ -130,9 +130,6 @@ void DropDatabaseCoordinator::_dropShardedCollection( sharding_ddl_util::removeCollAndChunksMetadataFromConfig( opCtx, coll, ShardingCatalogClient::kMajorityWriteConcern); - // Remove collection's query analyzer configuration document, if it exists. - sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss, coll.getUuid()); - _updateSession(opCtx); sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getCurrentSession()); @@ -153,6 +150,9 @@ void DropDatabaseCoordinator::_dropShardedCollection( // than all of the drops. sharding_ddl_util::sendDropCollectionParticipantCommandToShards( opCtx, nss, {primaryShardId}, **executor, getCurrentSession()); + + // Remove collection's query analyzer configuration document, if it exists. + sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss, coll.getUuid()); } void DropDatabaseCoordinator::_clearDatabaseInfoOnSecondaries(OperationContext* opCtx) { diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 37c0de48493..4af45811fba 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -323,7 +323,7 @@ void removeQueryAnalyzerMetadataFromConfig(OperationContext* opCtx, if (uuid) { deleteCmd.setDeletes({[&] { write_ops::DeleteOpEntry entry; - entry.setQ(BSON(QueryAnalyzerDocument::kCollectionUuidFieldName << uuid->toString())); + entry.setQ(BSON(QueryAnalyzerDocument::kCollectionUuidFieldName << *uuid)); entry.setMulti(false); return entry; }()}); diff --git a/src/mongo/s/analyze_shard_key_server_parameters.idl b/src/mongo/s/analyze_shard_key_server_parameters.idl index 0e1c00bc210..f18bf986c98 100644 --- a/src/mongo/s/analyze_shard_key_server_parameters.idl +++ b/src/mongo/s/analyze_shard_key_server_parameters.idl @@ -75,3 +75,13 @@ server_parameters: default: 5 validator: gt: 0 + queryAnalysisSamplerBurstMultiplier: + description: The ratio between the number of queries allowed to be sampled during a burst + and the number of queries allowed to be sampled per second (i.e. the sample + rate). + set_at: [startup, runtime] + cpp_vartype: AtomicWord<double> + cpp_varname: gQueryAnalysisSamplerBurstMultiplier + default: 2 + validator: + gte: 1 diff --git a/src/mongo/s/query_analysis_sampler.cpp b/src/mongo/s/query_analysis_sampler.cpp index 8c4150344a9..5276a480ccf 100644 --- a/src/mongo/s/query_analysis_sampler.cpp +++ b/src/mongo/s/query_analysis_sampler.cpp @@ -36,6 +36,7 @@ #include "mongo/s/analyze_shard_key_feature_flag_gen.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" +#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h" #include "mongo/util/net/socket_utils.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault @@ -49,6 +50,10 @@ MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisSampler); const auto getQueryAnalysisSampler = ServiceContext::declareDecoration<QueryAnalysisSampler>(); +bool isApproximatelyEqual(double val0, double val1, double epsilon) { + return std::fabs(val0 - val1) < (epsilon + std::numeric_limits<double>::epsilon()); +} + } // namespace QueryAnalysisSampler& QueryAnalysisSampler::get(OperationContext* opCtx) { @@ -123,6 +128,43 @@ void QueryAnalysisSampler::_refreshQueryStats() { _queryStats.refreshTotalCount(newTotalCount); } +double QueryAnalysisSampler::SampleRateLimiter::_getBurstCapacity(double numTokensPerSecond) { + return std::max(1.0, gQueryAnalysisSamplerBurstMultiplier.load() * numTokensPerSecond); +} + +void QueryAnalysisSampler::SampleRateLimiter::_refill(double numTokensPerSecond, + double burstCapacity) { + auto now = _serviceContext->getFastClockSource()->now(); + double numSecondsElapsed = + duration_cast<Microseconds>(now - _lastRefillTime).count() / 1000000.0; + if (numSecondsElapsed > 0) { + _lastNumTokens = + std::min(burstCapacity, numSecondsElapsed * numTokensPerSecond + _lastNumTokens); + _lastRefillTime = now; + } +} + +bool QueryAnalysisSampler::SampleRateLimiter::tryConsume() { + _refill(_numTokensPerSecond, _getBurstCapacity(_numTokensPerSecond)); + + if (_lastNumTokens >= 1) { + _lastNumTokens -= 1; + return true; + } else if (isApproximatelyEqual(_lastNumTokens, 1, kEpsilon)) { + // To avoid skipping queries that could have been sampled, allow one token to be consumed + // if there is nearly one. + _lastNumTokens = 0; + return true; + } + return false; +} + +void QueryAnalysisSampler::SampleRateLimiter::refreshRate(double numTokensPerSecond) { + // Fill the bucket with tokens created by the previous rate before setting a new rate. + _refill(_numTokensPerSecond, _getBurstCapacity(numTokensPerSecond)); + _numTokensPerSecond = numTokensPerSecond; +} + void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) { if (MONGO_unlikely(disableQueryAnalysisSampler.shouldFail())) { return; @@ -159,7 +201,38 @@ void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) { IDLParserContext("configurationRefresher"), swResponse.getValue().response); stdx::lock_guard<Latch> lk(_mutex); - _configurations = response.getConfigurations(); + std::map<NamespaceString, SampleRateLimiter> sampleRateLimiters; + for (const auto& configuration : response.getConfigurations()) { + auto it = _sampleRateLimiters.find(configuration.getNs()); + if (it == _sampleRateLimiters.end() || + it->second.getCollectionUuid() != configuration.getCollectionUuid()) { + // There is no existing SampleRateLimiter for the collection with this specific + // collection uuid so create one for it. + sampleRateLimiters.emplace(configuration.getNs(), + SampleRateLimiter{opCtx->getServiceContext(), + configuration.getNs(), + configuration.getCollectionUuid(), + configuration.getSampleRate()}); + } else { + auto rateLimiter = it->second; + invariant(rateLimiter.getNss() == configuration.getNs()); + rateLimiter.refreshRate(configuration.getSampleRate()); + sampleRateLimiters.emplace(configuration.getNs(), std::move(rateLimiter)); + } + } + _sampleRateLimiters = std::move(sampleRateLimiters); +} + +bool QueryAnalysisSampler::shouldSample(const NamespaceString& nss) { + stdx::lock_guard<Latch> lk(_mutex); + auto it = _sampleRateLimiters.find(nss); + + if (it == _sampleRateLimiters.end()) { + return false; + } + + auto& rateLimiter = it->second; + return rateLimiter.tryConsume(); } } // namespace analyze_shard_key diff --git a/src/mongo/s/query_analysis_sampler.h b/src/mongo/s/query_analysis_sampler.h index 2b1f5ab49f9..728267f256a 100644 --- a/src/mongo/s/query_analysis_sampler.h +++ b/src/mongo/s/query_analysis_sampler.h @@ -32,7 +32,6 @@ #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/s/analyze_shard_key_server_parameters_gen.h" -#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h" #include "mongo/util/periodic_runner.h" namespace mongo { @@ -45,6 +44,8 @@ namespace analyze_shard_key { * - The periodic background job that sends the calculated average to the coordinator to refresh the * latest configurations. The average determines the share of the cluster-wide sample rate that * will be assigned to this sampler. + * - The rate limiters that each controls the rate at which queries against a collection are sampled + * on this sampler. * * Currently, query sampling is only supported on a sharded cluster. So a sampler must be a mongos * and the coordinator must be the config server's primary mongod. @@ -84,6 +85,79 @@ public: boost::optional<double> _lastAvgCount; }; + /** + * Controls the per-second rate at which queries against a collection are sampled on this + * sampler. Uses token bucket. + */ + class SampleRateLimiter { + public: + static constexpr double kEpsilon = 0.001; + + SampleRateLimiter(ServiceContext* serviceContext, + const NamespaceString& nss, + const UUID& collUuid, + double numTokensPerSecond) + : _serviceContext(serviceContext), + _nss(nss), + _collUuid(collUuid), + _numTokensPerSecond(numTokensPerSecond) { + invariant(_numTokensPerSecond > 0); + _lastRefillTime = _serviceContext->getFastClockSource()->now(); + }; + + const NamespaceString& getNss() const { + return _nss; + } + + const UUID& getCollectionUuid() const { + return _collUuid; + } + + double getRate() const { + return _numTokensPerSecond; + } + + double getBurstCapacity() const { + return _getBurstCapacity(_numTokensPerSecond); + } + + /** + * Requests to consume one token from the bucket. Causes the bucket to be refilled with + * tokens created since last refill time. Does not block if the bucket is empty or there is + * fewer than one token in the bucket. Returns true if a token has been consumed + * successfully, and false otherwise. + */ + bool tryConsume(); + + /** + * Sets a new rate. Causes the bucket to be refilled with tokens created since last refill + * time according to the previous rate. + */ + void refreshRate(double numTokensPerSecond); + + private: + /** + * Returns the maximum of number of tokens that a bucket with given rate can store at any + * given time. + */ + static double _getBurstCapacity(double numTokensPerSecond); + + /** + * Fills the bucket with tokens created since last refill time according to the given rate + * and burst capacity. + */ + void _refill(double numTokensPerSecond, double burstCapacity); + + const ServiceContext* _serviceContext; + const NamespaceString _nss; + const UUID _collUuid; + double _numTokensPerSecond; + + // The bucket is only refilled when there is a consume request or a rate refresh. + Date_t _lastRefillTime; + double _lastNumTokens = 0; + }; + QueryAnalysisSampler() = default; ~QueryAnalysisSampler() = default; @@ -100,6 +174,13 @@ public: void onShutdown(); + /** + * Returns true if a query should be sampled, and false otherwise. Can only be invoked once on + * for each query since it decrements the number remaining queries to sample if this query + * should be sampled. + */ + bool shouldSample(const NamespaceString& nss); + void refreshQueryStatsForTest() { _refreshQueryStats(); } @@ -113,9 +194,9 @@ public: _refreshConfigurations(opCtx); } - std::vector<CollectionQueryAnalyzerConfiguration> getConfigurationsForTest() const { + std::map<NamespaceString, SampleRateLimiter> getRateLimitersForTest() const { stdx::lock_guard<Latch> lk(_mutex); - return _configurations; + return _sampleRateLimiters; } private: @@ -129,7 +210,7 @@ private: QueryStats _queryStats; PeriodicJobAnchor _periodicConfigurationsRefresher; - std::vector<CollectionQueryAnalyzerConfiguration> _configurations; + std::map<NamespaceString, SampleRateLimiter> _sampleRateLimiters; }; } // namespace analyze_shard_key diff --git a/src/mongo/s/query_analysis_sampler_test.cpp b/src/mongo/s/query_analysis_sampler_test.cpp index a54931c5750..74fe20cf5b9 100644 --- a/src/mongo/s/query_analysis_sampler_test.cpp +++ b/src/mongo/s/query_analysis_sampler_test.cpp @@ -32,7 +32,9 @@ #include "mongo/db/stats/counters.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" +#include "mongo/s/analyze_shard_key_common_gen.h" #include "mongo/s/is_mongos.h" +#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h" #include "mongo/s/sharding_router_test_fixture.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -131,6 +133,320 @@ DEATH_TEST_REGEX(QueryAnalysisSamplerQueryStatsTest, queryStats.refreshTotalCount(0); } +class QueryAnalysisSamplerRateLimiterTest : public ServiceContextTest { +public: + void setUp() override { + ServiceContextTest::setUp(); + getServiceContext()->setFastClockSource( + std::make_unique<SharedClockSourceAdapter>(_mockClock)); + getServiceContext()->setPreciseClockSource( + std::make_unique<SharedClockSourceAdapter>(_mockClock)); + } + + void advanceTime(Milliseconds millis) { + _mockClock->advance(millis); + } + + Date_t now() { + return _mockClock->now(); + } + +private: + const std::shared_ptr<ClockSourceMock> _mockClock = std::make_shared<ClockSourceMock>(); + +protected: + const NamespaceString nss{"testDb", "testColl"}; + const UUID collUuid = UUID::gen(); +}; + +DEATH_TEST_F(QueryAnalysisSamplerRateLimiterTest, CannotUseZeroRate, "invariant") { + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0); +} + +DEATH_TEST_F(QueryAnalysisSamplerRateLimiterTest, CannotUseNegativeRate, "invariant") { + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, -0.5); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, BurstMultiplierEqualToOne) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + // multiplier * rate > 1 + auto rateLimiter0 = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 5); + ASSERT_EQ(rateLimiter0.getRate(), 5); + ASSERT_EQ(rateLimiter0.getBurstCapacity(), 5); + + // multiplier * rate = 1 + auto rateLimiter1 = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter1.getRate(), 1); + ASSERT_EQ(rateLimiter1.getBurstCapacity(), 1); + + // multiplier * rate < 1 + auto rateLimiter2 = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.1); + ASSERT_EQ(rateLimiter2.getRate(), 0.1); + ASSERT_EQ(rateLimiter2.getBurstCapacity(), 1); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, BurstMultiplierGreaterThanOne) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2.5}; + + // multiplier * rate > 1 + auto rateLimiter0 = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 5); + ASSERT_EQ(rateLimiter0.getRate(), 5); + ASSERT_EQ(rateLimiter0.getBurstCapacity(), 12.5); + + // multiplier * rate = 1 + auto rateLimiter1 = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.4); + ASSERT_EQ(rateLimiter1.getRate(), 0.4); + ASSERT_EQ(rateLimiter1.getBurstCapacity(), 1); + + // multiplier * rate < 1 + auto rateLimiter2 = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.1); + ASSERT_EQ(rateLimiter2.getRate(), 0.1); + ASSERT_EQ(rateLimiter2.getBurstCapacity(), 1); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterOneSecond) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 2); + ASSERT_EQ(rateLimiter.getRate(), 2); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(1000)); + // The number of tokens available in the bucket right after the refill is 0 + 2. + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterLessThanOneSecond) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 4); + ASSERT_EQ(rateLimiter.getRate(), 4); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 4); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(500)); + // The number of tokens available in the bucket right after the refill is 0 + 2. + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterMoreThanOneSecond) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.5); + ASSERT_EQ(rateLimiter.getRate(), 0.5); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 1); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(2000)); + // The number of tokens available in the bucket right after the refill is 0 + 1. + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeEpsilonAbove) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 1); + ASSERT_GTE(QueryAnalysisSampler::SampleRateLimiter::kEpsilon, 0.001); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(999)); + // The number of tokens available in the bucket right after the refill is 0 + 0.999. + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeRemainingTokens) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 2); + ASSERT_EQ(rateLimiter.getRate(), 2); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(700)); + // The number of tokens available in the bucket right after the refill is 0 + 1.4. + ASSERT(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(800)); + // The number of tokens available in the bucket right after the refill is 0.4 + 1.6. + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeBurstCapacity) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(2000)); + // The number of tokens available in the bucket right after the refill is 2. + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAboveBurstCapacity) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(3000)); + // The number of tokens available in the bucket right after the refill is 2. + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeBelowBurstCapacity) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(1800)); + // The number of tokens available in the bucket right after the refill is 0 + 1.8. + ASSERT(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(200)); + // The number of tokens available in the bucket right after the refill is 0.8 + 0.2. + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterRefresh_RateIncreased) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.1); + ASSERT_EQ(rateLimiter.getRate(), 0.1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 1); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(20000)); + // The number of tokens available in the bucket right after the refill is 2 (note that this is + // greater than the pre-refresh capacity). + rateLimiter.refreshRate(1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(2000)); + // Verify the rate limiter now has the new rate and burst capacity. The number of tokens + // available in the bucket right after the refill is 2 (not 0.2). + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterRefresh_RateDecreased) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(2000)); + // The number of tokens available in the bucket right after the refill is 1 (note that this is + // less than the pre-refresh capacity). + rateLimiter.refreshRate(0.1); + ASSERT_EQ(rateLimiter.getRate(), 0.1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 1); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(8000)); + // The number of tokens available in the bucket right after the refill is 0.8 (not 8). + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(12000)); + // Verify the rate limiter now has the new rate and burst capacity. The number of tokens + // available in the bucket right after the refill is 1 (not 0.8 + 1.2). + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + +TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterRefresh_RateUnchanged) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 2}; + + auto rateLimiter = + QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + // There are no token available in the bucket initially. + ASSERT_FALSE(rateLimiter.tryConsume()); + + advanceTime(Milliseconds(1000)); + // The number of tokens available in the bucket right after the refill is 1. + rateLimiter.refreshRate(1); + ASSERT_EQ(rateLimiter.getRate(), 1); + ASSERT_EQ(rateLimiter.getBurstCapacity(), 2); + + advanceTime(Milliseconds(1000)); + // The number of tokens available in the bucket right after the refill is 1 + 1. + ASSERT(rateLimiter.tryConsume()); + ASSERT(rateLimiter.tryConsume()); + ASSERT_FALSE(rateLimiter.tryConsume()); +} + class QueryAnalysisSamplerTest : public ShardingTestFixture { public: void setUp() override { @@ -156,6 +472,14 @@ public: setMongos(_originalIsMongos); } + void advanceTime(Milliseconds millis) { + _mockClock->advance(millis); + } + + Date_t now() { + return _mockClock->now(); + } + /** * Asserts that the first unprocessed request corresponds to a * _refreshQueryAnalyzerConfiguration command and then responds to it with the given @@ -178,7 +502,32 @@ public: }); } + /** + * Makes the given sampler have the given configurations. + */ + void setUpConfigurations( + QueryAnalysisSampler* sampler, + const std::vector<CollectionQueryAnalyzerConfiguration>& configurations) { + globalOpCounters.gotQuery(); + sampler->refreshQueryStatsForTest(); + + auto queryStats = sampler->getQueryStatsForTest(); + ASSERT_EQ(*queryStats.getLastAvgCount(), 1); + + // Force the sampler to refresh its configurations. This should cause the sampler to send a + // _refreshQueryAnalyzerConfiguration command to get sent and update its configurations. + auto future = stdx::async(stdx::launch::async, [&] { + expectConfigurationRefreshReturnSuccess(*queryStats.getLastAvgCount(), configurations); + }); + sampler->refreshConfigurationsForTest(operationContext()); + future.get(); + + auto rateLimiters = sampler->getRateLimitersForTest(); + ASSERT_EQ(rateLimiters.size(), configurations.size()); + } + private: + const std::shared_ptr<ClockSourceMock> _mockClock = std::make_shared<ClockSourceMock>(); RAIIServerParameterControllerForTest _featureFlagController{"featureFlagAnalyzeShardKey", true}; bool _originalIsMongos; @@ -187,9 +536,11 @@ protected: const NamespaceString nss0{"testDb", "testColl0"}; const NamespaceString nss1{"testDb", "testColl1"}; + const NamespaceString nss2{"testDb", "testColl2"}; const UUID collUuid0 = UUID::gen(); const UUID collUuid1 = UUID::gen(); + const UUID collUuid2 = UUID::gen(); }; DEATH_TEST_F(QueryAnalysisSamplerTest, CannotGetIfFeatureFlagNotEnabled, "invariant") { @@ -286,8 +637,8 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) { // _refreshQueryAnalyzerConfiguration command to get sent since there is no // numQueriesExecutedPerSecond yet. sampler.refreshConfigurationsForTest(operationContext()); - auto configurations = sampler.getConfigurationsForTest(); - ASSERT(configurations.empty()); + auto rateLimiters = sampler.getRateLimitersForTest(); + ASSERT(rateLimiters.empty()); // The per-second counts after: [0]. sampler.refreshQueryStatsForTest(); @@ -311,11 +662,18 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) { sampler.refreshConfigurationsForTest(operationContext()); future1.get(); - auto configurations1 = sampler.getConfigurationsForTest(); - ASSERT_EQ(configurations1.size(), refreshedConfigurations1.size()); - for (size_t i = 0; i < configurations1.size(); i++) { - ASSERT_BSONOBJ_EQ(configurations1[i].toBSON(), refreshedConfigurations1[i].toBSON()); - } + auto rateLimiters1 = sampler.getRateLimitersForTest(); + ASSERT_EQ(rateLimiters1.size(), refreshedConfigurations1.size()); + + auto it0 = rateLimiters1.find(refreshedConfigurations1[0].getNs()); + ASSERT(it0 != rateLimiters1.end()); + ASSERT_EQ(it0->second.getCollectionUuid(), refreshedConfigurations1[0].getCollectionUuid()); + ASSERT_EQ(it0->second.getRate(), refreshedConfigurations1[0].getSampleRate()); + + auto it1 = rateLimiters1.find(refreshedConfigurations1[1].getNs()); + ASSERT(it1 != rateLimiters1.end()); + ASSERT_EQ(it1->second.getCollectionUuid(), refreshedConfigurations1[1].getCollectionUuid()); + ASSERT_EQ(it1->second.getRate(), refreshedConfigurations1[1].getSampleRate()); // The per-second counts after: [0, 2]. globalOpCounters.gotInserts(2); @@ -339,11 +697,13 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) { sampler.refreshConfigurationsForTest(operationContext()); future2.get(); - auto configurations2 = sampler.getConfigurationsForTest(); - ASSERT_EQ(configurations2.size(), refreshedConfigurations2.size()); - for (size_t i = 0; i < configurations2.size(); i++) { - ASSERT_BSONOBJ_EQ(configurations2[i].toBSON(), refreshedConfigurations2[i].toBSON()); - } + auto rateLimiters2 = sampler.getRateLimitersForTest(); + ASSERT_EQ(rateLimiters2.size(), refreshedConfigurations2.size()); + + auto it = rateLimiters2.find(refreshedConfigurations2[0].getNs()); + ASSERT(it != rateLimiters2.end()); + ASSERT_EQ(it->second.getCollectionUuid(), refreshedConfigurations2[0].getCollectionUuid()); + ASSERT_EQ(it->second.getRate(), refreshedConfigurations2[0].getSampleRate()); // The per-second counts after: [0, 2, 5]. globalOpCounters.gotInserts(5); @@ -365,8 +725,94 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) { sampler.refreshConfigurationsForTest(operationContext()); future3.get(); - auto configurations3 = sampler.getConfigurationsForTest(); - ASSERT(configurations3.empty()); + auto rateLimiters3 = sampler.getRateLimitersForTest(); + ASSERT(rateLimiters3.empty()); +} + +TEST_F(QueryAnalysisSamplerTest, ShouldSampleBasic) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto& sampler = QueryAnalysisSampler::get(operationContext()); + + std::vector<CollectionQueryAnalyzerConfiguration> configurations; + configurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, 1}); + configurations.push_back(CollectionQueryAnalyzerConfiguration{nss1, collUuid1, 0.5}); + setUpConfigurations(&sampler, configurations); + + // Cannot sample if time has not elapsed. + ASSERT_FALSE(sampler.shouldSample(nss0)); + ASSERT_FALSE(sampler.shouldSample(nss1)); + + advanceTime(Milliseconds(1000)); + // The number of tokens available in the bucket for rateLimiter0 right after the refill is 0 + // + 1.0. + ASSERT(sampler.shouldSample(nss0)); + ASSERT_FALSE(sampler.shouldSample(nss0)); + // The number of tokens available in the bucket for rateLimiter1 right after the refill is 0 + + // 0.5. + ASSERT_FALSE(sampler.shouldSample(nss1)); + // This collection doesn't have sampling enabled. + ASSERT_FALSE(sampler.shouldSample(nss2)); + + advanceTime(Milliseconds(1000)); + // The number of tokens available in the bucket for rateLimiter0 right after the refill is 0 + // + 1.0. + ASSERT(sampler.shouldSample(nss0)); + ASSERT_FALSE(sampler.shouldSample(nss0)); + // The number of tokens available in the bucket for rateLimiter1 right after the refill is 0.5 + + // 0.5. + ASSERT(sampler.shouldSample(nss1)); + ASSERT_FALSE(sampler.shouldSample(nss1)); + // This collection doesn't have sampling enabled. + ASSERT_FALSE(sampler.shouldSample(nss2)); +} + +TEST_F(QueryAnalysisSamplerTest, RefreshConfigurationsNewCollectionUuid) { + const RAIIServerParameterControllerForTest burstMultiplierController{ + "queryAnalysisSamplerBurstMultiplier", 1}; + + auto& sampler = QueryAnalysisSampler::get(operationContext()); + + std::vector<CollectionQueryAnalyzerConfiguration> oldConfigurations; + oldConfigurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, 2}); + setUpConfigurations(&sampler, oldConfigurations); + + auto oldRateLimiters = sampler.getRateLimitersForTest(); + ASSERT_EQ(oldRateLimiters.size(), oldConfigurations.size()); + + auto oldIt = oldRateLimiters.find(oldConfigurations[0].getNs()); + ASSERT(oldIt != oldRateLimiters.end()); + ASSERT_EQ(oldIt->second.getCollectionUuid(), oldConfigurations[0].getCollectionUuid()); + ASSERT_EQ(oldIt->second.getRate(), oldConfigurations[0].getSampleRate()); + + advanceTime(Milliseconds(1000)); + // The number of tokens available in the bucket right after the refill is 0 + 2. + ASSERT(sampler.shouldSample(nss0)); + + // Force the sampler to refresh and return a different collection uuid and sample rate this + // time. + auto queryStats = sampler.getQueryStatsForTest(); + std::vector<CollectionQueryAnalyzerConfiguration> newConfigurations; + newConfigurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, UUID::gen(), 1.5}); + auto future = stdx::async(stdx::launch::async, [&] { + expectConfigurationRefreshReturnSuccess(*queryStats.getLastAvgCount(), newConfigurations); + }); + sampler.refreshConfigurationsForTest(operationContext()); + future.get(); + + auto newRateLimiters = sampler.getRateLimitersForTest(); + ASSERT_EQ(newRateLimiters.size(), newConfigurations.size()); + + auto newIt = newRateLimiters.find(newConfigurations[0].getNs()); + ASSERT(newIt != newRateLimiters.end()); + ASSERT_EQ(newIt->second.getCollectionUuid(), newConfigurations[0].getCollectionUuid()); + ASSERT_EQ(newIt->second.getRate(), newConfigurations[0].getSampleRate()); + + // Cannot sample if time has not elapsed. There should be no tokens available in the bucket + // right after the refill unless the one token from the previous configurations was + // carried over, which is not the correct behavior. + ASSERT_FALSE(sampler.shouldSample(nss0)); } } // namespace |