summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/execution_control
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2023-03-13 18:00:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-13 19:32:57 +0000
commit31b639c5f4cf66744a2b4cf6d6c6932c15d32373 (patch)
treeb17c2fa412fcb883b7ec4f80446426aa773b1e3e /src/mongo/db/storage/execution_control
parentceebc338815821bde76cc7d9a547b996ea6f873e (diff)
downloadmongo-31b639c5f4cf66744a2b4cf6d6c6932c15d32373.tar.gz
SERVER-73824 Unit test throughput probing algorithm
Diffstat (limited to 'src/mongo/db/storage/execution_control')
-rw-r--r--src/mongo/db/storage/execution_control/throughput_probing_test.cpp266
1 files changed, 266 insertions, 0 deletions
diff --git a/src/mongo/db/storage/execution_control/throughput_probing_test.cpp b/src/mongo/db/storage/execution_control/throughput_probing_test.cpp
new file mode 100644
index 00000000000..340cb616b5a
--- /dev/null
+++ b/src/mongo/db/storage/execution_control/throughput_probing_test.cpp
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/storage/execution_control/throughput_probing.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo::execution_control {
+namespace {
+
+class MockPeriodicJob : public PeriodicRunner::ControllableJob {
+public:
+ explicit MockPeriodicJob(PeriodicRunner::PeriodicJob job) : _job(std::move(job)) {}
+
+ void start() override {}
+ void pause() override {}
+ void resume() override {}
+ void stop() override {}
+
+ Milliseconds getPeriod() override {
+ return _job.interval;
+ }
+
+ void setPeriod(Milliseconds period) override {
+ _job.interval = period;
+ }
+
+ void run(Client* client) {
+ _job.job(client);
+ }
+
+private:
+ PeriodicRunner::PeriodicJob _job;
+};
+
+class MockPeriodicRunner : public PeriodicRunner {
+public:
+ JobAnchor makeJob(PeriodicJob job) override {
+ invariant(!_job);
+ auto mockJob = std::make_shared<MockPeriodicJob>(std::move(job));
+ _job = mockJob;
+ return JobAnchor{std::move(mockJob)};
+ }
+
+ void run(Client* client) {
+ invariant(_job);
+ _job->run(client);
+ }
+
+private:
+ std::shared_ptr<MockPeriodicJob> _job;
+};
+
+class ThroughputProbingTest : public unittest::Test {
+protected:
+ explicit ThroughputProbingTest(int32_t size = 64)
+ : _runner([svcCtx = _svcCtx.get()] {
+ auto runner = std::make_unique<MockPeriodicRunner>();
+ auto runnerPtr = runner.get();
+ svcCtx->setPeriodicRunner(std::move(runner));
+ return runnerPtr;
+ }()),
+ _readTicketHolder(size),
+ _writeTicketHolder(size) {}
+
+ void _run() {
+ _runner->run(_client.get());
+ }
+
+ ServiceContext::UniqueServiceContext _svcCtx = ServiceContext::make();
+ ServiceContext::UniqueClient _client = _svcCtx->makeClient("ThroughputProbingTest");
+
+ MockPeriodicRunner* _runner;
+ MockTicketHolder _readTicketHolder;
+ MockTicketHolder _writeTicketHolder;
+
+ ThroughputProbing _througputProbing{
+ _svcCtx.get(), &_readTicketHolder, &_writeTicketHolder, Milliseconds{1}};
+};
+
+class ThroughputProbingMaxConcurrencyTest : public ThroughputProbingTest {
+protected:
+ ThroughputProbingMaxConcurrencyTest() : ThroughputProbingTest(128) {}
+};
+
+class ThroughputProbingMinConcurrencyTest : public ThroughputProbingTest {
+protected:
+ ThroughputProbingMinConcurrencyTest() : ThroughputProbingTest(5) {}
+};
+
+TEST_F(ThroughputProbingTest, ProbeUpSucceeds) {
+ // Tickets are exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size);
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Probe up next since tickets are exhausted.
+ _run();
+ ASSERT_GT(_readTicketHolder.outof(), size);
+ ASSERT_GT(_writeTicketHolder.outof(), size);
+
+ // Throughput inreases.
+ size = _readTicketHolder.outof();
+ _readTicketHolder.setNumFinishedProcessing(3);
+
+ // Probing up succeeds; the new value is promoted to stable.
+ _run();
+ ASSERT_EQ(_readTicketHolder.outof(), size);
+ ASSERT_EQ(_writeTicketHolder.outof(), size);
+}
+
+TEST_F(ThroughputProbingTest, ProbeUpFailsDownSucceeds) {
+ // Tickets are exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size);
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Probe up next since tickets are exhausted.
+ _run();
+ ASSERT_GT(_readTicketHolder.outof(), size);
+ ASSERT_GT(_writeTicketHolder.outof(), size);
+
+ // Throughput does not increase.
+ _readTicketHolder.setNumFinishedProcessing(2);
+
+ // Probing up fails since throughput did not increase. Probe down next.
+ _run();
+ ASSERT_LT(_readTicketHolder.outof(), size);
+ ASSERT_LT(_writeTicketHolder.outof(), size);
+
+ // Throughput inreases.
+ size = _readTicketHolder.outof();
+ _readTicketHolder.setNumFinishedProcessing(4);
+
+ // Probing down succeeds; the new value is promoted to stable.
+ _run();
+ ASSERT_EQ(_readTicketHolder.outof(), size);
+ ASSERT_EQ(_writeTicketHolder.outof(), size);
+}
+
+TEST_F(ThroughputProbingTest, ProbeUpFailsDownFails) {
+ // Tickets are exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size);
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Probe up next since tickets are exhausted.
+ _run();
+ ASSERT_GT(_readTicketHolder.outof(), size);
+ ASSERT_GT(_writeTicketHolder.outof(), size);
+
+ // Throughput does not increase.
+ _readTicketHolder.setNumFinishedProcessing(2);
+
+ // Probing up fails since throughput did not increase. Probe down next.
+ _run();
+ ASSERT_LT(_readTicketHolder.outof(), size);
+ ASSERT_LT(_writeTicketHolder.outof(), size);
+
+ // Throughput does not increase.
+ _readTicketHolder.setNumFinishedProcessing(3);
+
+ // Probing down fails since throughput did not increase. Return back to stable.
+ _run();
+ ASSERT_EQ(_readTicketHolder.outof(), size);
+ ASSERT_EQ(_writeTicketHolder.outof(), size);
+}
+
+TEST_F(ThroughputProbingTest, ProbeDownSucceeds) {
+ // Tickets are not exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Probe down next since tickets are not exhausted.
+ _run();
+ ASSERT_LT(_readTicketHolder.outof(), size);
+ ASSERT_LT(_writeTicketHolder.outof(), size);
+
+ // Throughput increases.
+ size = _readTicketHolder.outof();
+ _readTicketHolder.setNumFinishedProcessing(3);
+
+ // Probing down succeeds; the new value is promoted to stable.
+ _run();
+ ASSERT_EQ(_readTicketHolder.outof(), size);
+ ASSERT_EQ(_writeTicketHolder.outof(), size);
+}
+
+TEST_F(ThroughputProbingTest, ProbeDownFails) {
+ // Tickets are not exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Probe down next since tickets are not exhausted.
+ _run();
+ ASSERT_LT(_readTicketHolder.outof(), size);
+ ASSERT_LT(_writeTicketHolder.outof(), size);
+
+ // Throughput does not increase.
+ _readTicketHolder.setNumFinishedProcessing(2);
+
+ // Probing down fails since throughput did not increase. Return back to stable.
+ _run();
+ ASSERT_EQ(_readTicketHolder.outof(), size);
+ ASSERT_EQ(_writeTicketHolder.outof(), size);
+}
+
+TEST_F(ThroughputProbingMaxConcurrencyTest, NoProbeUp) {
+ // Tickets are exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size);
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Probe down since concurrency is already at its maximum allowed value, even though
+ // ticktes are exhausted.
+ _run();
+ ASSERT_LT(_readTicketHolder.outof(), size);
+ ASSERT_LT(_writeTicketHolder.outof(), size);
+}
+
+TEST_F(ThroughputProbingMinConcurrencyTest, NoProbeDown) {
+ // Tickets are not exhausted.
+ auto size = _readTicketHolder.outof();
+ _readTicketHolder.setUsed(size - 1);
+ _readTicketHolder.setNumFinishedProcessing(1);
+
+ // Stable. Do not probe in either direction since tickets are not exhausted but concurrency is
+ // already at its minimum allowed value.
+ _run();
+ ASSERT_EQ(_readTicketHolder.outof(), size);
+ ASSERT_EQ(_writeTicketHolder.outof(), size);
+}
+
+} // namespace
+} // namespace mongo::execution_control