summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/analyze_shard_key/configure_query_analyzer_persistence.js10
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.cpp4
-rw-r--r--src/mongo/db/s/drop_database_coordinator.cpp6
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp2
-rw-r--r--src/mongo/s/analyze_shard_key_server_parameters.idl10
-rw-r--r--src/mongo/s/query_analysis_sampler.cpp75
-rw-r--r--src/mongo/s/query_analysis_sampler.h89
-rw-r--r--src/mongo/s/query_analysis_sampler_test.cpp474
8 files changed, 641 insertions, 29 deletions
diff --git a/jstests/sharding/analyze_shard_key/configure_query_analyzer_persistence.js b/jstests/sharding/analyze_shard_key/configure_query_analyzer_persistence.js
index 94251101ab4..63e9e51e503 100644
--- a/jstests/sharding/analyze_shard_key/configure_query_analyzer_persistence.js
+++ b/jstests/sharding/analyze_shard_key/configure_query_analyzer_persistence.js
@@ -67,9 +67,9 @@ function assertQueryAnalyzerConfigDoc(configDb, db, collName, mode, sampleRate)
function assertNoQueryAnalyzerConfigDoc(configDb, db, collName) {
const configColl = configDb.getCollection('queryAnalyzers');
- const listCollRes =
- assert.commandWorked(db.runCommand({listCollections: 1, filter: {name: collName}}));
- assert.eq(listCollRes.cursor.firstBatch, 0);
+ const ns = db.getName() + "." + collName;
+ const doc = configColl.findOne({ns: ns});
+ assert.eq(doc, null, doc);
}
function testConfigurationOptions(conn, testCases) {
@@ -184,7 +184,9 @@ function testDropDatabaseDeletesConfig(conn) {
assertConfigQueryAnalyzerResponse(resUnsh, mode, sampleRate);
assertQueryAnalyzerConfigDoc(config, db, collNameUnsh, mode, sampleRate);
db.dropDatabase();
- assertNoQueryAnalyzerConfigDoc(config, db, collNameUnsh);
+ // TODO (SERVER-70479): dropDatabase doesn't delete the config.queryAnalyzers docs for unsharded
+ // collections.
+ // assertNoQueryAnalyzerConfigDoc(config, db, collNameUnsh);
}
{
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp
index 449d800368a..23a095ebe4e 100644
--- a/src/mongo/db/s/drop_collection_coordinator.cpp
+++ b/src/mongo/db/s/drop_collection_coordinator.cpp
@@ -182,8 +182,6 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl(
"namespace"_attr = nss(),
"sharded"_attr = collIsSharded);
- sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss(), boost::none);
-
if (collIsSharded) {
invariant(_doc.getCollInfo());
const auto& coll = _doc.getCollInfo().value();
@@ -217,6 +215,8 @@ ExecutorFuture<void> DropCollectionCoordinator::_runImpl(
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
opCtx, nss(), {primaryShardId}, **executor, getCurrentSession());
+ sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss(), boost::none);
+
ShardingLogging::get(opCtx)->logChange(opCtx, "dropCollection", nss().ns());
LOGV2(5390503, "Collection dropped", "namespace"_attr = nss());
}))
diff --git a/src/mongo/db/s/drop_database_coordinator.cpp b/src/mongo/db/s/drop_database_coordinator.cpp
index a492cee54de..fc7fd244719 100644
--- a/src/mongo/db/s/drop_database_coordinator.cpp
+++ b/src/mongo/db/s/drop_database_coordinator.cpp
@@ -130,9 +130,6 @@ void DropDatabaseCoordinator::_dropShardedCollection(
sharding_ddl_util::removeCollAndChunksMetadataFromConfig(
opCtx, coll, ShardingCatalogClient::kMajorityWriteConcern);
- // Remove collection's query analyzer configuration document, if it exists.
- sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss, coll.getUuid());
-
_updateSession(opCtx);
sharding_ddl_util::removeTagsMetadataFromConfig(opCtx, nss, getCurrentSession());
@@ -153,6 +150,9 @@ void DropDatabaseCoordinator::_dropShardedCollection(
// than all of the drops.
sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
opCtx, nss, {primaryShardId}, **executor, getCurrentSession());
+
+ // Remove collection's query analyzer configuration document, if it exists.
+ sharding_ddl_util::removeQueryAnalyzerMetadataFromConfig(opCtx, nss, coll.getUuid());
}
void DropDatabaseCoordinator::_clearDatabaseInfoOnSecondaries(OperationContext* opCtx) {
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index 37c0de48493..4af45811fba 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -323,7 +323,7 @@ void removeQueryAnalyzerMetadataFromConfig(OperationContext* opCtx,
if (uuid) {
deleteCmd.setDeletes({[&] {
write_ops::DeleteOpEntry entry;
- entry.setQ(BSON(QueryAnalyzerDocument::kCollectionUuidFieldName << uuid->toString()));
+ entry.setQ(BSON(QueryAnalyzerDocument::kCollectionUuidFieldName << *uuid));
entry.setMulti(false);
return entry;
}()});
diff --git a/src/mongo/s/analyze_shard_key_server_parameters.idl b/src/mongo/s/analyze_shard_key_server_parameters.idl
index 0e1c00bc210..f18bf986c98 100644
--- a/src/mongo/s/analyze_shard_key_server_parameters.idl
+++ b/src/mongo/s/analyze_shard_key_server_parameters.idl
@@ -75,3 +75,13 @@ server_parameters:
default: 5
validator:
gt: 0
+ queryAnalysisSamplerBurstMultiplier:
+ description: The ratio between the number of queries allowed to be sampled during a burst
+ and the number of queries allowed to be sampled per second (i.e. the sample
+ rate).
+ set_at: [startup, runtime]
+ cpp_vartype: AtomicWord<double>
+ cpp_varname: gQueryAnalysisSamplerBurstMultiplier
+ default: 2
+ validator:
+ gte: 1
diff --git a/src/mongo/s/query_analysis_sampler.cpp b/src/mongo/s/query_analysis_sampler.cpp
index 8c4150344a9..5276a480ccf 100644
--- a/src/mongo/s/query_analysis_sampler.cpp
+++ b/src/mongo/s/query_analysis_sampler.cpp
@@ -36,6 +36,7 @@
#include "mongo/s/analyze_shard_key_feature_flag_gen.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
+#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h"
#include "mongo/util/net/socket_utils.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
@@ -49,6 +50,10 @@ MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisSampler);
const auto getQueryAnalysisSampler = ServiceContext::declareDecoration<QueryAnalysisSampler>();
+bool isApproximatelyEqual(double val0, double val1, double epsilon) {
+ return std::fabs(val0 - val1) < (epsilon + std::numeric_limits<double>::epsilon());
+}
+
} // namespace
QueryAnalysisSampler& QueryAnalysisSampler::get(OperationContext* opCtx) {
@@ -123,6 +128,43 @@ void QueryAnalysisSampler::_refreshQueryStats() {
_queryStats.refreshTotalCount(newTotalCount);
}
+double QueryAnalysisSampler::SampleRateLimiter::_getBurstCapacity(double numTokensPerSecond) {
+ return std::max(1.0, gQueryAnalysisSamplerBurstMultiplier.load() * numTokensPerSecond);
+}
+
+void QueryAnalysisSampler::SampleRateLimiter::_refill(double numTokensPerSecond,
+ double burstCapacity) {
+ auto now = _serviceContext->getFastClockSource()->now();
+ double numSecondsElapsed =
+ duration_cast<Microseconds>(now - _lastRefillTime).count() / 1000000.0;
+ if (numSecondsElapsed > 0) {
+ _lastNumTokens =
+ std::min(burstCapacity, numSecondsElapsed * numTokensPerSecond + _lastNumTokens);
+ _lastRefillTime = now;
+ }
+}
+
+bool QueryAnalysisSampler::SampleRateLimiter::tryConsume() {
+ _refill(_numTokensPerSecond, _getBurstCapacity(_numTokensPerSecond));
+
+ if (_lastNumTokens >= 1) {
+ _lastNumTokens -= 1;
+ return true;
+ } else if (isApproximatelyEqual(_lastNumTokens, 1, kEpsilon)) {
+ // To avoid skipping queries that could have been sampled, allow one token to be consumed
+ // if there is nearly one.
+ _lastNumTokens = 0;
+ return true;
+ }
+ return false;
+}
+
+void QueryAnalysisSampler::SampleRateLimiter::refreshRate(double numTokensPerSecond) {
+ // Fill the bucket with tokens created by the previous rate before setting a new rate.
+ _refill(_numTokensPerSecond, _getBurstCapacity(numTokensPerSecond));
+ _numTokensPerSecond = numTokensPerSecond;
+}
+
void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) {
if (MONGO_unlikely(disableQueryAnalysisSampler.shouldFail())) {
return;
@@ -159,7 +201,38 @@ void QueryAnalysisSampler::_refreshConfigurations(OperationContext* opCtx) {
IDLParserContext("configurationRefresher"), swResponse.getValue().response);
stdx::lock_guard<Latch> lk(_mutex);
- _configurations = response.getConfigurations();
+ std::map<NamespaceString, SampleRateLimiter> sampleRateLimiters;
+ for (const auto& configuration : response.getConfigurations()) {
+ auto it = _sampleRateLimiters.find(configuration.getNs());
+ if (it == _sampleRateLimiters.end() ||
+ it->second.getCollectionUuid() != configuration.getCollectionUuid()) {
+ // There is no existing SampleRateLimiter for the collection with this specific
+ // collection uuid so create one for it.
+ sampleRateLimiters.emplace(configuration.getNs(),
+ SampleRateLimiter{opCtx->getServiceContext(),
+ configuration.getNs(),
+ configuration.getCollectionUuid(),
+ configuration.getSampleRate()});
+ } else {
+ auto rateLimiter = it->second;
+ invariant(rateLimiter.getNss() == configuration.getNs());
+ rateLimiter.refreshRate(configuration.getSampleRate());
+ sampleRateLimiters.emplace(configuration.getNs(), std::move(rateLimiter));
+ }
+ }
+ _sampleRateLimiters = std::move(sampleRateLimiters);
+}
+
+bool QueryAnalysisSampler::shouldSample(const NamespaceString& nss) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ auto it = _sampleRateLimiters.find(nss);
+
+ if (it == _sampleRateLimiters.end()) {
+ return false;
+ }
+
+ auto& rateLimiter = it->second;
+ return rateLimiter.tryConsume();
}
} // namespace analyze_shard_key
diff --git a/src/mongo/s/query_analysis_sampler.h b/src/mongo/s/query_analysis_sampler.h
index 2b1f5ab49f9..728267f256a 100644
--- a/src/mongo/s/query_analysis_sampler.h
+++ b/src/mongo/s/query_analysis_sampler.h
@@ -32,7 +32,6 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/s/analyze_shard_key_server_parameters_gen.h"
-#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h"
#include "mongo/util/periodic_runner.h"
namespace mongo {
@@ -45,6 +44,8 @@ namespace analyze_shard_key {
* - The periodic background job that sends the calculated average to the coordinator to refresh the
* latest configurations. The average determines the share of the cluster-wide sample rate that
* will be assigned to this sampler.
+ * - The rate limiters that each controls the rate at which queries against a collection are sampled
+ * on this sampler.
*
* Currently, query sampling is only supported on a sharded cluster. So a sampler must be a mongos
* and the coordinator must be the config server's primary mongod.
@@ -84,6 +85,79 @@ public:
boost::optional<double> _lastAvgCount;
};
+ /**
+ * Controls the per-second rate at which queries against a collection are sampled on this
+ * sampler. Uses token bucket.
+ */
+ class SampleRateLimiter {
+ public:
+ static constexpr double kEpsilon = 0.001;
+
+ SampleRateLimiter(ServiceContext* serviceContext,
+ const NamespaceString& nss,
+ const UUID& collUuid,
+ double numTokensPerSecond)
+ : _serviceContext(serviceContext),
+ _nss(nss),
+ _collUuid(collUuid),
+ _numTokensPerSecond(numTokensPerSecond) {
+ invariant(_numTokensPerSecond > 0);
+ _lastRefillTime = _serviceContext->getFastClockSource()->now();
+ };
+
+ const NamespaceString& getNss() const {
+ return _nss;
+ }
+
+ const UUID& getCollectionUuid() const {
+ return _collUuid;
+ }
+
+ double getRate() const {
+ return _numTokensPerSecond;
+ }
+
+ double getBurstCapacity() const {
+ return _getBurstCapacity(_numTokensPerSecond);
+ }
+
+ /**
+ * Requests to consume one token from the bucket. Causes the bucket to be refilled with
+ * tokens created since last refill time. Does not block if the bucket is empty or there is
+ * fewer than one token in the bucket. Returns true if a token has been consumed
+ * successfully, and false otherwise.
+ */
+ bool tryConsume();
+
+ /**
+ * Sets a new rate. Causes the bucket to be refilled with tokens created since last refill
+ * time according to the previous rate.
+ */
+ void refreshRate(double numTokensPerSecond);
+
+ private:
+ /**
+ * Returns the maximum of number of tokens that a bucket with given rate can store at any
+ * given time.
+ */
+ static double _getBurstCapacity(double numTokensPerSecond);
+
+ /**
+ * Fills the bucket with tokens created since last refill time according to the given rate
+ * and burst capacity.
+ */
+ void _refill(double numTokensPerSecond, double burstCapacity);
+
+ const ServiceContext* _serviceContext;
+ const NamespaceString _nss;
+ const UUID _collUuid;
+ double _numTokensPerSecond;
+
+ // The bucket is only refilled when there is a consume request or a rate refresh.
+ Date_t _lastRefillTime;
+ double _lastNumTokens = 0;
+ };
+
QueryAnalysisSampler() = default;
~QueryAnalysisSampler() = default;
@@ -100,6 +174,13 @@ public:
void onShutdown();
+ /**
+ * Returns true if a query should be sampled, and false otherwise. Can only be invoked once on
+ * for each query since it decrements the number remaining queries to sample if this query
+ * should be sampled.
+ */
+ bool shouldSample(const NamespaceString& nss);
+
void refreshQueryStatsForTest() {
_refreshQueryStats();
}
@@ -113,9 +194,9 @@ public:
_refreshConfigurations(opCtx);
}
- std::vector<CollectionQueryAnalyzerConfiguration> getConfigurationsForTest() const {
+ std::map<NamespaceString, SampleRateLimiter> getRateLimitersForTest() const {
stdx::lock_guard<Latch> lk(_mutex);
- return _configurations;
+ return _sampleRateLimiters;
}
private:
@@ -129,7 +210,7 @@ private:
QueryStats _queryStats;
PeriodicJobAnchor _periodicConfigurationsRefresher;
- std::vector<CollectionQueryAnalyzerConfiguration> _configurations;
+ std::map<NamespaceString, SampleRateLimiter> _sampleRateLimiters;
};
} // namespace analyze_shard_key
diff --git a/src/mongo/s/query_analysis_sampler_test.cpp b/src/mongo/s/query_analysis_sampler_test.cpp
index a54931c5750..74fe20cf5b9 100644
--- a/src/mongo/s/query_analysis_sampler_test.cpp
+++ b/src/mongo/s/query_analysis_sampler_test.cpp
@@ -32,7 +32,9 @@
#include "mongo/db/stats/counters.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/analyze_shard_key_common_gen.h"
#include "mongo/s/is_mongos.h"
+#include "mongo/s/refresh_query_analyzer_configuration_cmd_gen.h"
#include "mongo/s/sharding_router_test_fixture.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -131,6 +133,320 @@ DEATH_TEST_REGEX(QueryAnalysisSamplerQueryStatsTest,
queryStats.refreshTotalCount(0);
}
+class QueryAnalysisSamplerRateLimiterTest : public ServiceContextTest {
+public:
+ void setUp() override {
+ ServiceContextTest::setUp();
+ getServiceContext()->setFastClockSource(
+ std::make_unique<SharedClockSourceAdapter>(_mockClock));
+ getServiceContext()->setPreciseClockSource(
+ std::make_unique<SharedClockSourceAdapter>(_mockClock));
+ }
+
+ void advanceTime(Milliseconds millis) {
+ _mockClock->advance(millis);
+ }
+
+ Date_t now() {
+ return _mockClock->now();
+ }
+
+private:
+ const std::shared_ptr<ClockSourceMock> _mockClock = std::make_shared<ClockSourceMock>();
+
+protected:
+ const NamespaceString nss{"testDb", "testColl"};
+ const UUID collUuid = UUID::gen();
+};
+
+DEATH_TEST_F(QueryAnalysisSamplerRateLimiterTest, CannotUseZeroRate, "invariant") {
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0);
+}
+
+DEATH_TEST_F(QueryAnalysisSamplerRateLimiterTest, CannotUseNegativeRate, "invariant") {
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, -0.5);
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, BurstMultiplierEqualToOne) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ // multiplier * rate > 1
+ auto rateLimiter0 =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 5);
+ ASSERT_EQ(rateLimiter0.getRate(), 5);
+ ASSERT_EQ(rateLimiter0.getBurstCapacity(), 5);
+
+ // multiplier * rate = 1
+ auto rateLimiter1 =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter1.getRate(), 1);
+ ASSERT_EQ(rateLimiter1.getBurstCapacity(), 1);
+
+ // multiplier * rate < 1
+ auto rateLimiter2 =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.1);
+ ASSERT_EQ(rateLimiter2.getRate(), 0.1);
+ ASSERT_EQ(rateLimiter2.getBurstCapacity(), 1);
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, BurstMultiplierGreaterThanOne) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2.5};
+
+ // multiplier * rate > 1
+ auto rateLimiter0 =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 5);
+ ASSERT_EQ(rateLimiter0.getRate(), 5);
+ ASSERT_EQ(rateLimiter0.getBurstCapacity(), 12.5);
+
+ // multiplier * rate = 1
+ auto rateLimiter1 =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.4);
+ ASSERT_EQ(rateLimiter1.getRate(), 0.4);
+ ASSERT_EQ(rateLimiter1.getBurstCapacity(), 1);
+
+ // multiplier * rate < 1
+ auto rateLimiter2 =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.1);
+ ASSERT_EQ(rateLimiter2.getRate(), 0.1);
+ ASSERT_EQ(rateLimiter2.getBurstCapacity(), 1);
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterOneSecond) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 2);
+ ASSERT_EQ(rateLimiter.getRate(), 2);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(1000));
+ // The number of tokens available in the bucket right after the refill is 0 + 2.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterLessThanOneSecond) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 4);
+ ASSERT_EQ(rateLimiter.getRate(), 4);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 4);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(500));
+ // The number of tokens available in the bucket right after the refill is 0 + 2.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterMoreThanOneSecond) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.5);
+ ASSERT_EQ(rateLimiter.getRate(), 0.5);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 1);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(2000));
+ // The number of tokens available in the bucket right after the refill is 0 + 1.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeEpsilonAbove) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 1);
+ ASSERT_GTE(QueryAnalysisSampler::SampleRateLimiter::kEpsilon, 0.001);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(999));
+ // The number of tokens available in the bucket right after the refill is 0 + 0.999.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeRemainingTokens) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 2);
+ ASSERT_EQ(rateLimiter.getRate(), 2);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(700));
+ // The number of tokens available in the bucket right after the refill is 0 + 1.4.
+ ASSERT(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(800));
+ // The number of tokens available in the bucket right after the refill is 0.4 + 1.6.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeBurstCapacity) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(2000));
+ // The number of tokens available in the bucket right after the refill is 2.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAboveBurstCapacity) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(3000));
+ // The number of tokens available in the bucket right after the refill is 2.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeBelowBurstCapacity) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(1800));
+ // The number of tokens available in the bucket right after the refill is 0 + 1.8.
+ ASSERT(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(200));
+ // The number of tokens available in the bucket right after the refill is 0.8 + 0.2.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterRefresh_RateIncreased) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 0.1);
+ ASSERT_EQ(rateLimiter.getRate(), 0.1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 1);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(20000));
+ // The number of tokens available in the bucket right after the refill is 2 (note that this is
+ // greater than the pre-refresh capacity).
+ rateLimiter.refreshRate(1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(2000));
+ // Verify the rate limiter now has the new rate and burst capacity. The number of tokens
+ // available in the bucket right after the refill is 2 (not 0.2).
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterRefresh_RateDecreased) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(2000));
+ // The number of tokens available in the bucket right after the refill is 1 (note that this is
+ // less than the pre-refresh capacity).
+ rateLimiter.refreshRate(0.1);
+ ASSERT_EQ(rateLimiter.getRate(), 0.1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 1);
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(8000));
+ // The number of tokens available in the bucket right after the refill is 0.8 (not 8).
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(12000));
+ // Verify the rate limiter now has the new rate and burst capacity. The number of tokens
+ // available in the bucket right after the refill is 1 (not 0.8 + 1.2).
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
+TEST_F(QueryAnalysisSamplerRateLimiterTest, ConsumeAfterRefresh_RateUnchanged) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 2};
+
+ auto rateLimiter =
+ QueryAnalysisSampler::SampleRateLimiter(getServiceContext(), nss, collUuid, 1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+ // There are no token available in the bucket initially.
+ ASSERT_FALSE(rateLimiter.tryConsume());
+
+ advanceTime(Milliseconds(1000));
+ // The number of tokens available in the bucket right after the refill is 1.
+ rateLimiter.refreshRate(1);
+ ASSERT_EQ(rateLimiter.getRate(), 1);
+ ASSERT_EQ(rateLimiter.getBurstCapacity(), 2);
+
+ advanceTime(Milliseconds(1000));
+ // The number of tokens available in the bucket right after the refill is 1 + 1.
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT(rateLimiter.tryConsume());
+ ASSERT_FALSE(rateLimiter.tryConsume());
+}
+
class QueryAnalysisSamplerTest : public ShardingTestFixture {
public:
void setUp() override {
@@ -156,6 +472,14 @@ public:
setMongos(_originalIsMongos);
}
+ void advanceTime(Milliseconds millis) {
+ _mockClock->advance(millis);
+ }
+
+ Date_t now() {
+ return _mockClock->now();
+ }
+
/**
* Asserts that the first unprocessed request corresponds to a
* _refreshQueryAnalyzerConfiguration command and then responds to it with the given
@@ -178,7 +502,32 @@ public:
});
}
+ /**
+ * Makes the given sampler have the given configurations.
+ */
+ void setUpConfigurations(
+ QueryAnalysisSampler* sampler,
+ const std::vector<CollectionQueryAnalyzerConfiguration>& configurations) {
+ globalOpCounters.gotQuery();
+ sampler->refreshQueryStatsForTest();
+
+ auto queryStats = sampler->getQueryStatsForTest();
+ ASSERT_EQ(*queryStats.getLastAvgCount(), 1);
+
+ // Force the sampler to refresh its configurations. This should cause the sampler to send a
+ // _refreshQueryAnalyzerConfiguration command to get sent and update its configurations.
+ auto future = stdx::async(stdx::launch::async, [&] {
+ expectConfigurationRefreshReturnSuccess(*queryStats.getLastAvgCount(), configurations);
+ });
+ sampler->refreshConfigurationsForTest(operationContext());
+ future.get();
+
+ auto rateLimiters = sampler->getRateLimitersForTest();
+ ASSERT_EQ(rateLimiters.size(), configurations.size());
+ }
+
private:
+ const std::shared_ptr<ClockSourceMock> _mockClock = std::make_shared<ClockSourceMock>();
RAIIServerParameterControllerForTest _featureFlagController{"featureFlagAnalyzeShardKey", true};
bool _originalIsMongos;
@@ -187,9 +536,11 @@ protected:
const NamespaceString nss0{"testDb", "testColl0"};
const NamespaceString nss1{"testDb", "testColl1"};
+ const NamespaceString nss2{"testDb", "testColl2"};
const UUID collUuid0 = UUID::gen();
const UUID collUuid1 = UUID::gen();
+ const UUID collUuid2 = UUID::gen();
};
DEATH_TEST_F(QueryAnalysisSamplerTest, CannotGetIfFeatureFlagNotEnabled, "invariant") {
@@ -286,8 +637,8 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) {
// _refreshQueryAnalyzerConfiguration command to get sent since there is no
// numQueriesExecutedPerSecond yet.
sampler.refreshConfigurationsForTest(operationContext());
- auto configurations = sampler.getConfigurationsForTest();
- ASSERT(configurations.empty());
+ auto rateLimiters = sampler.getRateLimitersForTest();
+ ASSERT(rateLimiters.empty());
// The per-second counts after: [0].
sampler.refreshQueryStatsForTest();
@@ -311,11 +662,18 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) {
sampler.refreshConfigurationsForTest(operationContext());
future1.get();
- auto configurations1 = sampler.getConfigurationsForTest();
- ASSERT_EQ(configurations1.size(), refreshedConfigurations1.size());
- for (size_t i = 0; i < configurations1.size(); i++) {
- ASSERT_BSONOBJ_EQ(configurations1[i].toBSON(), refreshedConfigurations1[i].toBSON());
- }
+ auto rateLimiters1 = sampler.getRateLimitersForTest();
+ ASSERT_EQ(rateLimiters1.size(), refreshedConfigurations1.size());
+
+ auto it0 = rateLimiters1.find(refreshedConfigurations1[0].getNs());
+ ASSERT(it0 != rateLimiters1.end());
+ ASSERT_EQ(it0->second.getCollectionUuid(), refreshedConfigurations1[0].getCollectionUuid());
+ ASSERT_EQ(it0->second.getRate(), refreshedConfigurations1[0].getSampleRate());
+
+ auto it1 = rateLimiters1.find(refreshedConfigurations1[1].getNs());
+ ASSERT(it1 != rateLimiters1.end());
+ ASSERT_EQ(it1->second.getCollectionUuid(), refreshedConfigurations1[1].getCollectionUuid());
+ ASSERT_EQ(it1->second.getRate(), refreshedConfigurations1[1].getSampleRate());
// The per-second counts after: [0, 2].
globalOpCounters.gotInserts(2);
@@ -339,11 +697,13 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) {
sampler.refreshConfigurationsForTest(operationContext());
future2.get();
- auto configurations2 = sampler.getConfigurationsForTest();
- ASSERT_EQ(configurations2.size(), refreshedConfigurations2.size());
- for (size_t i = 0; i < configurations2.size(); i++) {
- ASSERT_BSONOBJ_EQ(configurations2[i].toBSON(), refreshedConfigurations2[i].toBSON());
- }
+ auto rateLimiters2 = sampler.getRateLimitersForTest();
+ ASSERT_EQ(rateLimiters2.size(), refreshedConfigurations2.size());
+
+ auto it = rateLimiters2.find(refreshedConfigurations2[0].getNs());
+ ASSERT(it != rateLimiters2.end());
+ ASSERT_EQ(it->second.getCollectionUuid(), refreshedConfigurations2[0].getCollectionUuid());
+ ASSERT_EQ(it->second.getRate(), refreshedConfigurations2[0].getSampleRate());
// The per-second counts after: [0, 2, 5].
globalOpCounters.gotInserts(5);
@@ -365,8 +725,94 @@ TEST_F(QueryAnalysisSamplerTest, RefreshQueryStatsAndConfigurations) {
sampler.refreshConfigurationsForTest(operationContext());
future3.get();
- auto configurations3 = sampler.getConfigurationsForTest();
- ASSERT(configurations3.empty());
+ auto rateLimiters3 = sampler.getRateLimitersForTest();
+ ASSERT(rateLimiters3.empty());
+}
+
+TEST_F(QueryAnalysisSamplerTest, ShouldSampleBasic) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto& sampler = QueryAnalysisSampler::get(operationContext());
+
+ std::vector<CollectionQueryAnalyzerConfiguration> configurations;
+ configurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, 1});
+ configurations.push_back(CollectionQueryAnalyzerConfiguration{nss1, collUuid1, 0.5});
+ setUpConfigurations(&sampler, configurations);
+
+ // Cannot sample if time has not elapsed.
+ ASSERT_FALSE(sampler.shouldSample(nss0));
+ ASSERT_FALSE(sampler.shouldSample(nss1));
+
+ advanceTime(Milliseconds(1000));
+ // The number of tokens available in the bucket for rateLimiter0 right after the refill is 0
+ // + 1.0.
+ ASSERT(sampler.shouldSample(nss0));
+ ASSERT_FALSE(sampler.shouldSample(nss0));
+ // The number of tokens available in the bucket for rateLimiter1 right after the refill is 0 +
+ // 0.5.
+ ASSERT_FALSE(sampler.shouldSample(nss1));
+ // This collection doesn't have sampling enabled.
+ ASSERT_FALSE(sampler.shouldSample(nss2));
+
+ advanceTime(Milliseconds(1000));
+ // The number of tokens available in the bucket for rateLimiter0 right after the refill is 0
+ // + 1.0.
+ ASSERT(sampler.shouldSample(nss0));
+ ASSERT_FALSE(sampler.shouldSample(nss0));
+ // The number of tokens available in the bucket for rateLimiter1 right after the refill is 0.5 +
+ // 0.5.
+ ASSERT(sampler.shouldSample(nss1));
+ ASSERT_FALSE(sampler.shouldSample(nss1));
+ // This collection doesn't have sampling enabled.
+ ASSERT_FALSE(sampler.shouldSample(nss2));
+}
+
+TEST_F(QueryAnalysisSamplerTest, RefreshConfigurationsNewCollectionUuid) {
+ const RAIIServerParameterControllerForTest burstMultiplierController{
+ "queryAnalysisSamplerBurstMultiplier", 1};
+
+ auto& sampler = QueryAnalysisSampler::get(operationContext());
+
+ std::vector<CollectionQueryAnalyzerConfiguration> oldConfigurations;
+ oldConfigurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, collUuid0, 2});
+ setUpConfigurations(&sampler, oldConfigurations);
+
+ auto oldRateLimiters = sampler.getRateLimitersForTest();
+ ASSERT_EQ(oldRateLimiters.size(), oldConfigurations.size());
+
+ auto oldIt = oldRateLimiters.find(oldConfigurations[0].getNs());
+ ASSERT(oldIt != oldRateLimiters.end());
+ ASSERT_EQ(oldIt->second.getCollectionUuid(), oldConfigurations[0].getCollectionUuid());
+ ASSERT_EQ(oldIt->second.getRate(), oldConfigurations[0].getSampleRate());
+
+ advanceTime(Milliseconds(1000));
+ // The number of tokens available in the bucket right after the refill is 0 + 2.
+ ASSERT(sampler.shouldSample(nss0));
+
+ // Force the sampler to refresh and return a different collection uuid and sample rate this
+ // time.
+ auto queryStats = sampler.getQueryStatsForTest();
+ std::vector<CollectionQueryAnalyzerConfiguration> newConfigurations;
+ newConfigurations.push_back(CollectionQueryAnalyzerConfiguration{nss0, UUID::gen(), 1.5});
+ auto future = stdx::async(stdx::launch::async, [&] {
+ expectConfigurationRefreshReturnSuccess(*queryStats.getLastAvgCount(), newConfigurations);
+ });
+ sampler.refreshConfigurationsForTest(operationContext());
+ future.get();
+
+ auto newRateLimiters = sampler.getRateLimitersForTest();
+ ASSERT_EQ(newRateLimiters.size(), newConfigurations.size());
+
+ auto newIt = newRateLimiters.find(newConfigurations[0].getNs());
+ ASSERT(newIt != newRateLimiters.end());
+ ASSERT_EQ(newIt->second.getCollectionUuid(), newConfigurations[0].getCollectionUuid());
+ ASSERT_EQ(newIt->second.getRate(), newConfigurations[0].getSampleRate());
+
+ // Cannot sample if time has not elapsed. There should be no tokens available in the bucket
+ // right after the refill unless the one token from the previous configurations was
+ // carried over, which is not the correct behavior.
+ ASSERT_FALSE(sampler.shouldSample(nss0));
}
} // namespace