diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2017-10-16 16:26:02 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2017-11-06 17:19:10 -0500 |
commit | e7837911c89af144fe012e5063f8ca88c4c66956 (patch) | |
tree | eb2c141aa289033a400ede246e3478083e5e81bf /src/mongo/transport/service_state_machine_test.cpp | |
parent | dc712619bf21f7c577f28b3f8281bf4c25362511 (diff) | |
download | mongo-e7837911c89af144fe012e5063f8ca88c4c66956.tar.gz |
SERVER-31538 Ensure the ServiceStateMachine always gets cleaned up on error/termination
Diffstat (limited to 'src/mongo/transport/service_state_machine_test.cpp')
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 276 |
1 files changed, 260 insertions, 16 deletions
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 646431580f7..0e4ca8c15c1 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -39,7 +39,7 @@ #include "mongo/transport/mock_session.h" #include "mongo/transport/mock_ticket.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_noop.h" +#include "mongo/transport/service_executor.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/unittest.h" @@ -51,6 +51,11 @@ namespace mongo { namespace { +inline std::string stateToString(ServiceStateMachine::State state) { + std::string ret = str::stream() << state; + return ret; +} + class MockSEP : public ServiceEntryPoint { public: virtual ~MockSEP() = default; @@ -122,9 +127,9 @@ public: return TransportLayer::TicketSessionClosedStatus; } - if (_nextMessage) { - *message = *_nextMessage; - } + OpMsgBuilder builder; + builder.setBody(BSON("ping" << 1)); + *message = builder.finish(); return TransportLayerMock::sourceMessage(session, message, expiration); } @@ -154,9 +159,10 @@ public: ASSERT_EQ(_ssm->state(), _lastTicketSource ? ServiceStateMachine::State::SourceWait : ServiceStateMachine::State::SinkWait); - std::stringstream ss; - ss << _ssm->state(); - log() << "In wait. ssm state: " << ss.str(); + + log() << "In wait. ssm state: " << stateToString(_ssm->state()); + if (_waitHook) + _waitHook(); return TransportLayerMock::wait(std::move(ticket)); } @@ -164,10 +170,6 @@ public: MONGO_UNREACHABLE; } - void setNextMessage(Message&& message) { - _nextMessage = std::move(message); - } - void setSSM(ServiceStateMachine* ssm) { _ssm = ssm; } @@ -190,14 +192,18 @@ public: return _ranSource; } + void setWaitHook(stdx::function<void()> hook) { + _waitHook = std::move(hook); + } + private: bool _lastTicketSource = true; bool _ranSink = false; bool _ranSource = false; - boost::optional<Message> _nextMessage; FailureMode _nextShouldFail = Nothing; Message _lastSunk; ServiceStateMachine* _ssm; + stdx::function<void()> _waitHook; }; Message buildRequest(BSONObj input) { @@ -206,6 +212,61 @@ Message buildRequest(BSONObj input) { return builder.finish(); } +class MockServiceExecutor : public ServiceExecutor { +public: + explicit MockServiceExecutor(ServiceContext* ctx) {} + + using ScheduleHook = stdx::function<bool(Task)>; + + Status start() override { + return Status::OK(); + } + Status shutdown(Milliseconds timeout) override { + return Status::OK(); + } + Status schedule(Task task, ScheduleFlags flags) override { + if (!_scheduleHook) { + return Status::OK(); + } else { + return _scheduleHook(std::move(task)) ? Status::OK() : Status{ErrorCodes::InternalError, + "Hook returned error!"}; + } + } + + Mode transportMode() const override { + return Mode::kSynchronous; + } + + void appendStats(BSONObjBuilder* bob) const override {} + + void setScheduleHook(ScheduleHook hook) { + _scheduleHook = std::move(hook); + } + +private: + ScheduleHook _scheduleHook; +}; + +class SimpleEvent { +public: + void signal() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _signaled = true; + _cond.notify_one(); + } + + void wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _cond.wait(lk, [this] { return _signaled; }); + _signaled = false; + } + +private: + stdx::mutex _mutex; + stdx::condition_variable _cond; + bool _signaled = false; +}; + using State = ServiceStateMachine::State; class ServiceStateMachineFixture : public unittest::Test { @@ -223,7 +284,9 @@ protected: _sep = sep.get(); sc->setServiceEntryPoint(std::move(sep)); - sc->setServiceExecutor(stdx::make_unique<ServiceExecutorNoop>(sc)); + auto se = stdx::make_unique<MockServiceExecutor>(sc); + _sexec = se.get(); + sc->setServiceExecutor(std::move(se)); auto tl = stdx::make_unique<MockTL>(); _tl = tl.get(); @@ -236,7 +299,7 @@ protected: } void tearDown() override { - getGlobalServiceContext()->getTransportLayer()->shutdown(); + _tl->shutdown(); } void runPingTest(State first, State second); @@ -244,14 +307,13 @@ protected: MockTL* _tl; MockSEP* _sep; + MockServiceExecutor* _sexec; SessionHandle _session; std::shared_ptr<ServiceStateMachine> _ssm; bool _ranHandler; }; void ServiceStateMachineFixture::runPingTest(State first, State second) { - _tl->setNextMessage(buildRequest(BSON("ping" << 1))); - ASSERT_FALSE(haveClient()); ASSERT_EQ(_ssm->state(), State::Created); log() << "run next"; @@ -329,5 +391,187 @@ TEST_F(ServiceStateMachineFixture, TestSessionCleanupOnDestroy) { ASSERT_TRUE(hookRan); } +// This tests that SSMs that fail to schedule their first task get cleaned up correctly. +// (i.e. we couldn't create a worker thread after accept()). +TEST_F(ServiceStateMachineFixture, ScheduleFailureDuringCreateCleanup) { + _sexec->setScheduleHook([](auto) { return false; }); + // Set a cleanup hook so we know that the cleanup hook actually gets run when the session + // is destroyed + bool hookRan = false; + _ssm->setCleanupHook([&hookRan] { hookRan = true; }); + + _ssm->start(ServiceStateMachine::Ownership::kOwned); + ASSERT_EQ(State::Ended, _ssm->state()); + ASSERT_EQ(_ssm.use_count(), 1); + ASSERT_TRUE(hookRan); +} + +// This tests that calling terminate() actually ends and cleans up the SSM during all the +// states. +TEST_F(ServiceStateMachineFixture, TerminateWorksForAllStates) { + SimpleEvent hookRan, okayToContinue; + + auto cleanupHook = [&hookRan] { + log() << "Cleaning up session"; + hookRan.signal(); + }; + + // This is a shared hook between the executor/TL that lets us notify the test that the SSM + // has reached a certain state and then gets terminated during that state. + State waitFor = State::Created; + SimpleEvent atDesiredState; + auto waitForHook = [this, &waitFor, &atDesiredState, &okayToContinue]() { + log() << "Checking for wakeup at " << stateToString(_ssm->state()) << ". Expecting " + << stateToString(waitFor); + if (_ssm->state() == waitFor) { + atDesiredState.signal(); + okayToContinue.wait(); + } + }; + + // This wraps the waitForHook so that schedules always succeed. + _sexec->setScheduleHook([waitForHook](auto) { + waitForHook(); + return true; + }); + + // This just lets us intercept calls to _tl->wait() and terminate during them. + _tl->setWaitHook(waitForHook); + + // Run this same test for each state. + auto states = {State::Source, State::SourceWait, State::Process, State::SinkWait}; + for (const auto testState : states) { + log() << "Testing termination during " << stateToString(testState); + + // Reset the _ssm to a fresh SSM and reset our tracking variables. + _ssm = ServiceStateMachine::create( + getGlobalServiceContext(), _tl->createSession(), transport::Mode::kSynchronous); + _tl->setSSM(_ssm.get()); + _ssm->setCleanupHook(cleanupHook); + + waitFor = testState; + // This is a dummy thread that just advances the SSM while we track its state/kill it + stdx::thread runner([ssm = _ssm] { + while (ssm->state() != State::Ended) { + ssm->runNext(); + } + }); + + // Wait for the SSM to advance to the expected state + atDesiredState.wait(); + log() << "Terminating session at " << stateToString(_ssm->state()); + + // Terminate the SSM + _ssm->terminate(); + + // Notify the waitForHook to continue and end the session + okayToContinue.signal(); + + // Wait for the SSM to terminate and the thread to end. + hookRan.wait(); + runner.join(); + + // Verify that the SSM terminated and is in the correct state + ASSERT_EQ(State::Ended, _ssm->state()); + ASSERT_EQ(_ssm.use_count(), 1); + } +} + +// This tests that calling terminate() actually ends and cleans up the SSM during all states, and +// with schedule() returning an error for each state. +TEST_F(ServiceStateMachineFixture, TerminateWorksForAllStatesWithScheduleFailure) { + // Set a cleanup hook so we know that the cleanup hook actually gets run when the session + // is destroyed + SimpleEvent hookRan, okayToContinue; + bool scheduleFailed = false; + + auto cleanupHook = [&hookRan] { + log() << "Cleaning up session"; + hookRan.signal(); + }; + + // This is a shared hook between the executor/TL that lets us notify the test that the SSM + // has reached a certain state and then gets terminated during that state. + State waitFor = State::Created; + SimpleEvent atDesiredState; + auto waitForHook = [this, &waitFor, &scheduleFailed, &okayToContinue, &atDesiredState]() { + log() << "Checking for wakeup at " << stateToString(_ssm->state()) << ". Expecting " + << stateToString(waitFor); + if (_ssm->state() == waitFor) { + atDesiredState.signal(); + okayToContinue.wait(); + scheduleFailed = true; + return false; + } + return true; + }; + + _sexec->setScheduleHook([waitForHook](auto) { return waitForHook(); }); + // This wraps the waitForHook and discards its return status. + _tl->setWaitHook([waitForHook] { waitForHook(); }); + + auto states = {State::Source, State::SourceWait, State::Process, State::SinkWait}; + for (const auto testState : states) { + log() << "Testing termination during " << stateToString(testState); + _ssm = ServiceStateMachine::create( + getGlobalServiceContext(), _tl->createSession(), transport::Mode::kSynchronous); + _tl->setSSM(_ssm.get()); + scheduleFailed = false; + _ssm->setCleanupHook(cleanupHook); + + waitFor = testState; + // This is a dummy thread that just advances the SSM while we track its state/kill it + stdx::thread runner([ ssm = _ssm, &scheduleFailed ] { + while (ssm->state() != State::Ended && !scheduleFailed) { + ssm->runNext(); + } + }); + + // Wait for the SSM to advance to the expected state + atDesiredState.wait(); + ASSERT_EQ(_ssm->state(), testState); + log() << "Terminating session at " << stateToString(_ssm->state()); + + // Terminate the SSM + _ssm->terminate(); + + // Notify the waitForHook to continue and end the session + okayToContinue.signal(); + hookRan.wait(); + runner.join(); + + // Verify that the SSM terminated and is in the correct state + ASSERT_EQ(State::Ended, _ssm->state()); + ASSERT_EQ(_ssm.use_count(), 1); + } +} + +// This makes sure that the SSM can run recursively by forcing the ServiceExecutor to run everything +// recursively +TEST_F(ServiceStateMachineFixture, SSMRunsRecursively) { + // This lets us force the SSM to only run once. After sinking the first response, the next call + // to sourceMessage will return with an error. + _tl->setWaitHook([this] { + if (_ssm->state() == State::SinkWait) { + _tl->setNextFailure(); + } + }); + + // The scheduleHook just runs the task, effectively making this a recursive executor. + int recursionDepth = 0; + _sexec->setScheduleHook([&recursionDepth](auto task) { + log() << "running task in executor. depth: " << ++recursionDepth; + task(); + return true; + }); + + _ssm->runNext(); + // Check that the SSM actually ran, is ended, and actually ran recursively + ASSERT_EQ(recursionDepth, 2); + ASSERT_TRUE(_tl->ranSource()); + ASSERT_TRUE(_tl->ranSink()); + ASSERT_EQ(_ssm->state(), State::Ended); +} + } // namespace } // namespace mongo |