diff options
Diffstat (limited to 'src/mongo/transport/service_executor_test.cpp')
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 367 |
1 files changed, 353 insertions, 14 deletions
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 7bf605cc28a..70605762783 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -50,10 +50,12 @@ #include "mongo/unittest/matcher.h" #include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/future.h" #include "mongo/util/scopeguard.h" +#include "mongo/util/synchronized_value.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest @@ -63,10 +65,125 @@ namespace { namespace m = unittest::match; +using unittest::stringify::stringifyForAssert; + constexpr auto kWorkerThreadRunTime = Milliseconds{1000}; // Run time + generous scheduling time slice constexpr auto kShutdownTime = Milliseconds{kWorkerThreadRunTime.count() + 50}; +template <typename M> +class AtomicWordLoadIs : public m::Matcher { +public: + explicit AtomicWordLoadIs(M m) : _m{std::move(m)} {} + + std::string describe() const { + return "AtomicWordLoadIs({})"_format(_m.describe()); + } + + template <typename T> + m::MatchResult match(const T& x) const { + auto r = x.load(); + if (auto mr = _m.match(r); !mr) + return {false, + "{} failed {}{}"_format(stringifyForAssert(r), _m.describe(), mr.message())}; + return {}; + } + +private: + M _m; +}; + +/** Matches a ServiceExecutor or the BSONObj it produces with its `appendStats`. */ +template <typename ThreadMatch, typename ClientMatch> +class ExecStatsIs : public m::Matcher { +public: + ExecStatsIs(std::string execStatsLabel, ThreadMatch tm, ClientMatch cm) + : _execStatsLabel{std::move(execStatsLabel)}, _tm{std::move(tm)}, _cm{std::move(cm)} {} + + std::string describe() const { + return "ExecStatsIs({},{})"_format(_tm.describe(), _cm.describe()); + } + + m::MatchResult match(const BSONObj& x) const { + unittest::stringify::Joiner joiner; + bool ok = true; + auto obj = x[_execStatsLabel].Obj(); + + auto tIn = obj["threadsRunning"].Int(); + if (auto tmr = _tm.match(tIn); !tmr) { + joiner("threadsRunning={} failed {}{}"_format( + stringifyForAssert(tIn), _tm.describe(), tmr.message())); + ok = false; + } + + auto cIn = obj["clientsInTotal"].Int(); + if (auto cmr = _cm.match(cIn); !cmr) { + joiner("clientsInTotal={} failed {}{}"_format( + stringifyForAssert(cIn), _cm.describe(), cmr.message())); + ok = false; + } + return {ok, std::string{joiner}}; + } + + m::MatchResult match(const ServiceExecutor& exec) const { + BSONObjBuilder bob; + exec.appendStats(&bob); + BSONObj obj = bob.done(); + if (auto mr = match(obj); !mr) + return {false, "obj={}, message={}"_format(obj.toString(), mr.message())}; + return {}; + } + +private: + std::string _execStatsLabel; + ThreadMatch _tm; + ClientMatch _cm; +}; + +/** + * Match is re-evaluated repeatedly with an exponential backoff, up to some + * limit, at which time this enclosing matcher fails. + */ +template <typename M> +class SoonMatches : public m::Matcher { +public: + explicit SoonMatches(M&& m, int retries = 16) : _m{std::forward<M>(m)}, _retries{retries} {} + + std::string describe() const { + return "SoonMatches({},{})"_format(_m.describe(), _retries); + } + + template <typename X> + m::MatchResult match(const X& x) const { + // Fibonacci generator for slow integral exponential backoff. + auto fib = [seq = std::array<int64_t, 2>{0, 1}]() mutable { + auto r = seq[0]; + seq[0] = seq[1]; + seq[1] = r + seq[0]; + return r; + }; + m::MatchResult mr; + for (int retries = _retries; retries--;) { + if (mr = _m.match(x); mr) + return mr; + Milliseconds backoff{fib()}; + LOGV2_DEBUG(1715120, + 1, + "Retry", + "matcher"_attr = describe(), + "retries"_attr = retries, + "backoff"_attr = backoff, + "message"_attr = mr.message()); + sleepFor(backoff); + } + return {false, "No result matched after {} tries: {}"_format(_retries, mr.message())}; + } + +private: + M _m; + int _retries; +}; + class JoinThread : public stdx::thread { public: using stdx::thread::thread; @@ -142,30 +259,252 @@ private: asio::io_context _ioContext; }; -class ServiceExecutorSynchronousTest : public unittest::Test { +/** + * ServiceExecutorSynchronous and ServiceExecutorReserved are closely related. + * This is a common basis for the fixtures that test them. + */ +template <typename Derived> +class ServiceExecutorSynchronousTestBase : public unittest::Test { public: - void setUp() override { - setGlobalServiceContext(ServiceContext::make()); + auto execStatsElementMatcher(int threads, int clients) { + return ExecStatsIs(getStatsLabel(), m::Eq(threads), m::Eq(clients)); + } + + void testCreateDestroy() { + makeExecutor(); + } + + void testStartStop() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + ASSERT_OK(executor.shutdown(kShutdownTime)); + } + + void testMakeTaskRunnerFailsBeforeStartup() { + auto executor = makeExecutor(); + ASSERT_THROWS(executor.makeTaskRunner(), DBException); } - void tearDown() override { - setGlobalServiceContext(nullptr); + void testMakeTaskRunner() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + executor.makeTaskRunner(); + ASSERT_OK(executor.shutdown(kShutdownTime)); + } + + void testMakeTaskRunnerMultiple() { + auto reserved = getReserved(); + auto executor = makeExecutor(); +#define LOCAL_CHECK_STATS(threads, clients) \ + ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(threads, clients))) + ASSERT_OK(executor.start()); + LOCAL_CHECK_STATS(reserved, 0); + std::vector<std::unique_ptr<ServiceExecutor::Executor>> runners; + // Add a few more beyond the reserve. + for (size_t i = 0; i < reserved + 3; ++i) { + runners.push_back(executor.makeTaskRunner()); + LOCAL_CHECK_STATS(runners.size() + reserved, runners.size()) << ", i:" << i; + } + ASSERT_OK(executor.shutdown(kShutdownTime)); + LOCAL_CHECK_STATS(0, 0); +#undef LOCAL_CHECK_STATS + } + + void testBasicTaskRuns() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + PromiseAndFuture<void> pf; + auto runner = executor.makeTaskRunner(); + runner->schedule([&](Status st) { pf.promise.setFrom(st); }); + ASSERT_DOES_NOT_THROW(pf.future.get()); + ASSERT_OK(executor.shutdown(kShutdownTime)); + } + + void testShutdownTimeout() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + auto runner = executor.makeTaskRunner(); + PromiseAndFuture<void> taskStarted; + runner->schedule([&](Status st) { + taskStarted.promise.setFrom(st); + sleepFor(Milliseconds{2000}); + }); + taskStarted.future.get(); + ASSERT_THAT(executor.shutdown(Milliseconds{1000}), + m::StatusIs(m::Eq(ErrorCodes::ExceededTimeLimit), m::Any())); + } + + // Should tolerate the failure to spawn all these reserved threads. + void testManyLeases() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + for (size_t i = 0; i < 10; ++i) { + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + for (size_t j = 0; j < 20; ++j) + leases.push_back(executor.makeTaskRunner()); + } + } + + decltype(auto) makeExecutor() { + return _d().makeExecutor(); + } + + virtual std::string getStatsLabel() const = 0; + + virtual size_t getReserved() const = 0; + + std::unique_ptr<FailPointEnableBlock> makeFailSpawnBlock() { + return std::make_unique<FailPointEnableBlock>( + "serviceExecutorSynchronousThreadFailToSpawn"); + } + +private: + decltype(auto) _d() const { + return static_cast<const Derived&>(*this); + } + decltype(auto) _d() { + return static_cast<Derived&>(*this); + } +}; + +class ServiceExecutorSynchronousTest + : public ServiceExecutorSynchronousTestBase<ServiceExecutorSynchronousTest> { +public: + ServiceExecutorSynchronous makeExecutor() const { + return {}; + } + std::string getStatsLabel() const override { + return "passthrough"; + } + size_t getReserved() const override { + return 0; } }; -TEST_F(ServiceExecutorSynchronousTest, BasicTaskRuns) { - ServiceExecutorSynchronous executor(getGlobalServiceContext()); +class ServiceExecutorReservedTest + : public ServiceExecutorSynchronousTestBase<ServiceExecutorReservedTest> { +public: + ServiceExecutorReserved makeExecutor() const { + return {"testReserved", reserved, maxIdleThreads}; + } + std::string getStatsLabel() const override { + return "reserved"; + } + size_t getReserved() const override { + return reserved; + } + +protected: + size_t reserved = 5; + size_t maxIdleThreads = 0; +}; + +#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, case) \ + TEST_F(fixture, case) { \ + test##case (); \ + } + +/** + * Expand this macro to instantiate the test cases for each of the corresponding + * member functions of the fixture base class. These are tests that + * ServiceExecutorSynchronous and ServiceExecutorReserved should pass. + */ +#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(fixture) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, CreateDestroy) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, StartStop) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, BasicTaskRuns) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerFailsBeforeStartup) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunner) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerMultiple) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ShutdownTimeout) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ManyLeases) \ + /**/ + +SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorSynchronousTest) +SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorReservedTest) + +#define SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(exec, threads, clients) \ + ASSERT_THAT(exec, SoonMatches(execStatsElementMatcher(threads, clients))) + +// Create leases until leases exceed the reserve count of threads, and check +// that the executor keeps its thread count above the number of leases by a +// margin of `reserved` as it goes. +TEST_F(ServiceExecutorReservedTest, CreateLeaseBeyondReserve) { +#define LOCAL_CHECK_STATS(threads, clients) \ + SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(executor, threads, clients) + reserved = 5; + auto executor = makeExecutor(); ASSERT_OK(executor.start()); - PromiseAndFuture<void> pf; - auto runner = executor.makeTaskRunner(); - runner->schedule([&](Status st) { pf.promise.setFrom(st); }); - ASSERT_DOES_NOT_THROW(pf.future.get()); + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + while (leases.size() < reserved + 1) { + leases.push_back(executor.makeTaskRunner()); + LOCAL_CHECK_STATS(leases.size() + reserved, leases.size()); + } + while (!leases.empty()) { + leases.pop_back(); + LOCAL_CHECK_STATS(leases.size() + reserved, leases.size()); + } ASSERT_OK(executor.shutdown(kShutdownTime)); + LOCAL_CHECK_STATS(0, 0); +#undef LOCAL_CHECK_STATS +} + +TEST_F(ServiceExecutorReservedTest, ImmediateThrowFromNoReserveSpawnFailure) { + reserved = 0; + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + auto failSpawns = makeFailSpawnBlock(); + ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, 0))); + ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>); + failSpawns = {}; + ASSERT_DOES_NOT_THROW(executor.makeTaskRunner()); } -TEST_F(ServiceExecutorSynchronousTest, MakeTaskRunnerFailsBeforeStartup) { - ServiceExecutorSynchronous executor{getGlobalServiceContext()}; - ASSERT_THROWS(executor.makeTaskRunner(), DBException); +// The basic point of the "reserved" ServiceExecutor is to allow new connections +// during time periods in which spawns are failing. Verify this fundamental +// requirement of the reserved ServiceExecutor. +TEST_F(ServiceExecutorReservedTest, ReserveMitigatesSpawnFailures) { + reserved = 5; + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0)); + auto failSpawns = makeFailSpawnBlock(); + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + while (leases.size() < reserved) + leases.push_back(executor.makeTaskRunner()); + // One worker is in the starting state while it unsuccesfully attempts to spawn. + // After the failure, we expect it to be removed from the starting bucket. + ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, leases.size()))) + << "Should be sufficient reserve threads for demand during setup"; + ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>) + << "Should throw when out of reserve threads"; + failSpawns = {}; + ASSERT_DOES_NOT_THROW(executor.makeTaskRunner()); +} + +// Check that workers are kept alive after their leases expire according to maxIdleThreads. +TEST_F(ServiceExecutorReservedTest, MaxIdleThreads) { + for (reserved = 0; reserved != 5; ++reserved) { + for (maxIdleThreads = 0; maxIdleThreads != 5; ++maxIdleThreads) { + for (size_t leaseCount = 0; leaseCount != reserved + maxIdleThreads; ++leaseCount) { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0)); + + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + while (leases.size() < leaseCount) + leases.push_back(executor.makeTaskRunner()); + leases.clear(); + + ASSERT_THAT(executor, + SoonMatches(execStatsElementMatcher( + reserved + std::min(maxIdleThreads, leaseCount), 0))) + << ", reserved=" << reserved // + << ", maxIdleThreads=" << maxIdleThreads // + << ", leaseCount=" << leaseCount; + } + } + } } class ServiceExecutorFixedTest : public unittest::Test { |