summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_executor_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_executor_test.cpp')
-rw-r--r--src/mongo/transport/service_executor_test.cpp367
1 files changed, 353 insertions, 14 deletions
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 7bf605cc28a..70605762783 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -50,10 +50,12 @@
#include "mongo/unittest/matcher.h"
#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/future.h"
#include "mongo/util/scopeguard.h"
+#include "mongo/util/synchronized_value.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
@@ -63,10 +65,125 @@ namespace {
namespace m = unittest::match;
+using unittest::stringify::stringifyForAssert;
+
constexpr auto kWorkerThreadRunTime = Milliseconds{1000};
// Run time + generous scheduling time slice
constexpr auto kShutdownTime = Milliseconds{kWorkerThreadRunTime.count() + 50};
+template <typename M>
+class AtomicWordLoadIs : public m::Matcher {
+public:
+ explicit AtomicWordLoadIs(M m) : _m{std::move(m)} {}
+
+ std::string describe() const {
+ return "AtomicWordLoadIs({})"_format(_m.describe());
+ }
+
+ template <typename T>
+ m::MatchResult match(const T& x) const {
+ auto r = x.load();
+ if (auto mr = _m.match(r); !mr)
+ return {false,
+ "{} failed {}{}"_format(stringifyForAssert(r), _m.describe(), mr.message())};
+ return {};
+ }
+
+private:
+ M _m;
+};
+
+/** Matches a ServiceExecutor or the BSONObj it produces with its `appendStats`. */
+template <typename ThreadMatch, typename ClientMatch>
+class ExecStatsIs : public m::Matcher {
+public:
+ ExecStatsIs(std::string execStatsLabel, ThreadMatch tm, ClientMatch cm)
+ : _execStatsLabel{std::move(execStatsLabel)}, _tm{std::move(tm)}, _cm{std::move(cm)} {}
+
+ std::string describe() const {
+ return "ExecStatsIs({},{})"_format(_tm.describe(), _cm.describe());
+ }
+
+ m::MatchResult match(const BSONObj& x) const {
+ unittest::stringify::Joiner joiner;
+ bool ok = true;
+ auto obj = x[_execStatsLabel].Obj();
+
+ auto tIn = obj["threadsRunning"].Int();
+ if (auto tmr = _tm.match(tIn); !tmr) {
+ joiner("threadsRunning={} failed {}{}"_format(
+ stringifyForAssert(tIn), _tm.describe(), tmr.message()));
+ ok = false;
+ }
+
+ auto cIn = obj["clientsInTotal"].Int();
+ if (auto cmr = _cm.match(cIn); !cmr) {
+ joiner("clientsInTotal={} failed {}{}"_format(
+ stringifyForAssert(cIn), _cm.describe(), cmr.message()));
+ ok = false;
+ }
+ return {ok, std::string{joiner}};
+ }
+
+ m::MatchResult match(const ServiceExecutor& exec) const {
+ BSONObjBuilder bob;
+ exec.appendStats(&bob);
+ BSONObj obj = bob.done();
+ if (auto mr = match(obj); !mr)
+ return {false, "obj={}, message={}"_format(obj.toString(), mr.message())};
+ return {};
+ }
+
+private:
+ std::string _execStatsLabel;
+ ThreadMatch _tm;
+ ClientMatch _cm;
+};
+
+/**
+ * Match is re-evaluated repeatedly with an exponential backoff, up to some
+ * limit, at which time this enclosing matcher fails.
+ */
+template <typename M>
+class SoonMatches : public m::Matcher {
+public:
+ explicit SoonMatches(M&& m, int retries = 16) : _m{std::forward<M>(m)}, _retries{retries} {}
+
+ std::string describe() const {
+ return "SoonMatches({},{})"_format(_m.describe(), _retries);
+ }
+
+ template <typename X>
+ m::MatchResult match(const X& x) const {
+ // Fibonacci generator for slow integral exponential backoff.
+ auto fib = [seq = std::array<int64_t, 2>{0, 1}]() mutable {
+ auto r = seq[0];
+ seq[0] = seq[1];
+ seq[1] = r + seq[0];
+ return r;
+ };
+ m::MatchResult mr;
+ for (int retries = _retries; retries--;) {
+ if (mr = _m.match(x); mr)
+ return mr;
+ Milliseconds backoff{fib()};
+ LOGV2_DEBUG(1715120,
+ 1,
+ "Retry",
+ "matcher"_attr = describe(),
+ "retries"_attr = retries,
+ "backoff"_attr = backoff,
+ "message"_attr = mr.message());
+ sleepFor(backoff);
+ }
+ return {false, "No result matched after {} tries: {}"_format(_retries, mr.message())};
+ }
+
+private:
+ M _m;
+ int _retries;
+};
+
class JoinThread : public stdx::thread {
public:
using stdx::thread::thread;
@@ -142,30 +259,252 @@ private:
asio::io_context _ioContext;
};
-class ServiceExecutorSynchronousTest : public unittest::Test {
+/**
+ * ServiceExecutorSynchronous and ServiceExecutorReserved are closely related.
+ * This is a common basis for the fixtures that test them.
+ */
+template <typename Derived>
+class ServiceExecutorSynchronousTestBase : public unittest::Test {
public:
- void setUp() override {
- setGlobalServiceContext(ServiceContext::make());
+ auto execStatsElementMatcher(int threads, int clients) {
+ return ExecStatsIs(getStatsLabel(), m::Eq(threads), m::Eq(clients));
+ }
+
+ void testCreateDestroy() {
+ makeExecutor();
+ }
+
+ void testStartStop() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ }
+
+ void testMakeTaskRunnerFailsBeforeStartup() {
+ auto executor = makeExecutor();
+ ASSERT_THROWS(executor.makeTaskRunner(), DBException);
}
- void tearDown() override {
- setGlobalServiceContext(nullptr);
+ void testMakeTaskRunner() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ executor.makeTaskRunner();
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ }
+
+ void testMakeTaskRunnerMultiple() {
+ auto reserved = getReserved();
+ auto executor = makeExecutor();
+#define LOCAL_CHECK_STATS(threads, clients) \
+ ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(threads, clients)))
+ ASSERT_OK(executor.start());
+ LOCAL_CHECK_STATS(reserved, 0);
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> runners;
+ // Add a few more beyond the reserve.
+ for (size_t i = 0; i < reserved + 3; ++i) {
+ runners.push_back(executor.makeTaskRunner());
+ LOCAL_CHECK_STATS(runners.size() + reserved, runners.size()) << ", i:" << i;
+ }
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ LOCAL_CHECK_STATS(0, 0);
+#undef LOCAL_CHECK_STATS
+ }
+
+ void testBasicTaskRuns() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ PromiseAndFuture<void> pf;
+ auto runner = executor.makeTaskRunner();
+ runner->schedule([&](Status st) { pf.promise.setFrom(st); });
+ ASSERT_DOES_NOT_THROW(pf.future.get());
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ }
+
+ void testShutdownTimeout() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ auto runner = executor.makeTaskRunner();
+ PromiseAndFuture<void> taskStarted;
+ runner->schedule([&](Status st) {
+ taskStarted.promise.setFrom(st);
+ sleepFor(Milliseconds{2000});
+ });
+ taskStarted.future.get();
+ ASSERT_THAT(executor.shutdown(Milliseconds{1000}),
+ m::StatusIs(m::Eq(ErrorCodes::ExceededTimeLimit), m::Any()));
+ }
+
+ // Should tolerate the failure to spawn all these reserved threads.
+ void testManyLeases() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ for (size_t i = 0; i < 10; ++i) {
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ for (size_t j = 0; j < 20; ++j)
+ leases.push_back(executor.makeTaskRunner());
+ }
+ }
+
+ decltype(auto) makeExecutor() {
+ return _d().makeExecutor();
+ }
+
+ virtual std::string getStatsLabel() const = 0;
+
+ virtual size_t getReserved() const = 0;
+
+ std::unique_ptr<FailPointEnableBlock> makeFailSpawnBlock() {
+ return std::make_unique<FailPointEnableBlock>(
+ "serviceExecutorSynchronousThreadFailToSpawn");
+ }
+
+private:
+ decltype(auto) _d() const {
+ return static_cast<const Derived&>(*this);
+ }
+ decltype(auto) _d() {
+ return static_cast<Derived&>(*this);
+ }
+};
+
+class ServiceExecutorSynchronousTest
+ : public ServiceExecutorSynchronousTestBase<ServiceExecutorSynchronousTest> {
+public:
+ ServiceExecutorSynchronous makeExecutor() const {
+ return {};
+ }
+ std::string getStatsLabel() const override {
+ return "passthrough";
+ }
+ size_t getReserved() const override {
+ return 0;
}
};
-TEST_F(ServiceExecutorSynchronousTest, BasicTaskRuns) {
- ServiceExecutorSynchronous executor(getGlobalServiceContext());
+class ServiceExecutorReservedTest
+ : public ServiceExecutorSynchronousTestBase<ServiceExecutorReservedTest> {
+public:
+ ServiceExecutorReserved makeExecutor() const {
+ return {"testReserved", reserved, maxIdleThreads};
+ }
+ std::string getStatsLabel() const override {
+ return "reserved";
+ }
+ size_t getReserved() const override {
+ return reserved;
+ }
+
+protected:
+ size_t reserved = 5;
+ size_t maxIdleThreads = 0;
+};
+
+#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, case) \
+ TEST_F(fixture, case) { \
+ test##case (); \
+ }
+
+/**
+ * Expand this macro to instantiate the test cases for each of the corresponding
+ * member functions of the fixture base class. These are tests that
+ * ServiceExecutorSynchronous and ServiceExecutorReserved should pass.
+ */
+#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(fixture) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, CreateDestroy) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, StartStop) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, BasicTaskRuns) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerFailsBeforeStartup) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunner) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerMultiple) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ShutdownTimeout) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ManyLeases) \
+ /**/
+
+SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorSynchronousTest)
+SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorReservedTest)
+
+#define SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(exec, threads, clients) \
+ ASSERT_THAT(exec, SoonMatches(execStatsElementMatcher(threads, clients)))
+
+// Create leases until leases exceed the reserve count of threads, and check
+// that the executor keeps its thread count above the number of leases by a
+// margin of `reserved` as it goes.
+TEST_F(ServiceExecutorReservedTest, CreateLeaseBeyondReserve) {
+#define LOCAL_CHECK_STATS(threads, clients) \
+ SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(executor, threads, clients)
+ reserved = 5;
+ auto executor = makeExecutor();
ASSERT_OK(executor.start());
- PromiseAndFuture<void> pf;
- auto runner = executor.makeTaskRunner();
- runner->schedule([&](Status st) { pf.promise.setFrom(st); });
- ASSERT_DOES_NOT_THROW(pf.future.get());
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ while (leases.size() < reserved + 1) {
+ leases.push_back(executor.makeTaskRunner());
+ LOCAL_CHECK_STATS(leases.size() + reserved, leases.size());
+ }
+ while (!leases.empty()) {
+ leases.pop_back();
+ LOCAL_CHECK_STATS(leases.size() + reserved, leases.size());
+ }
ASSERT_OK(executor.shutdown(kShutdownTime));
+ LOCAL_CHECK_STATS(0, 0);
+#undef LOCAL_CHECK_STATS
+}
+
+TEST_F(ServiceExecutorReservedTest, ImmediateThrowFromNoReserveSpawnFailure) {
+ reserved = 0;
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ auto failSpawns = makeFailSpawnBlock();
+ ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, 0)));
+ ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>);
+ failSpawns = {};
+ ASSERT_DOES_NOT_THROW(executor.makeTaskRunner());
}
-TEST_F(ServiceExecutorSynchronousTest, MakeTaskRunnerFailsBeforeStartup) {
- ServiceExecutorSynchronous executor{getGlobalServiceContext()};
- ASSERT_THROWS(executor.makeTaskRunner(), DBException);
+// The basic point of the "reserved" ServiceExecutor is to allow new connections
+// during time periods in which spawns are failing. Verify this fundamental
+// requirement of the reserved ServiceExecutor.
+TEST_F(ServiceExecutorReservedTest, ReserveMitigatesSpawnFailures) {
+ reserved = 5;
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0));
+ auto failSpawns = makeFailSpawnBlock();
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ while (leases.size() < reserved)
+ leases.push_back(executor.makeTaskRunner());
+ // One worker is in the starting state while it unsuccesfully attempts to spawn.
+ // After the failure, we expect it to be removed from the starting bucket.
+ ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, leases.size())))
+ << "Should be sufficient reserve threads for demand during setup";
+ ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>)
+ << "Should throw when out of reserve threads";
+ failSpawns = {};
+ ASSERT_DOES_NOT_THROW(executor.makeTaskRunner());
+}
+
+// Check that workers are kept alive after their leases expire according to maxIdleThreads.
+TEST_F(ServiceExecutorReservedTest, MaxIdleThreads) {
+ for (reserved = 0; reserved != 5; ++reserved) {
+ for (maxIdleThreads = 0; maxIdleThreads != 5; ++maxIdleThreads) {
+ for (size_t leaseCount = 0; leaseCount != reserved + maxIdleThreads; ++leaseCount) {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0));
+
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ while (leases.size() < leaseCount)
+ leases.push_back(executor.makeTaskRunner());
+ leases.clear();
+
+ ASSERT_THAT(executor,
+ SoonMatches(execStatsElementMatcher(
+ reserved + std::min(maxIdleThreads, leaseCount), 0)))
+ << ", reserved=" << reserved //
+ << ", maxIdleThreads=" << maxIdleThreads //
+ << ", leaseCount=" << leaseCount;
+ }
+ }
+ }
}
class ServiceExecutorFixedTest : public unittest::Test {