summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine_test.cpp
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2017-10-16 16:26:02 -0400
committerJonathan Reams <jbreams@mongodb.com>2017-11-06 17:19:10 -0500
commite7837911c89af144fe012e5063f8ca88c4c66956 (patch)
treeeb2c141aa289033a400ede246e3478083e5e81bf /src/mongo/transport/service_state_machine_test.cpp
parentdc712619bf21f7c577f28b3f8281bf4c25362511 (diff)
downloadmongo-e7837911c89af144fe012e5063f8ca88c4c66956.tar.gz
SERVER-31538 Ensure the ServiceStateMachine always gets cleaned up on error/termination
Diffstat (limited to 'src/mongo/transport/service_state_machine_test.cpp')
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp276
1 files changed, 260 insertions, 16 deletions
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index 646431580f7..0e4ca8c15c1 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -39,7 +39,7 @@
#include "mongo/transport/mock_session.h"
#include "mongo/transport/mock_ticket.h"
#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/service_executor_noop.h"
+#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/transport_layer_mock.h"
#include "mongo/unittest/unittest.h"
@@ -51,6 +51,11 @@
namespace mongo {
namespace {
+inline std::string stateToString(ServiceStateMachine::State state) {
+ std::string ret = str::stream() << state;
+ return ret;
+}
+
class MockSEP : public ServiceEntryPoint {
public:
virtual ~MockSEP() = default;
@@ -122,9 +127,9 @@ public:
return TransportLayer::TicketSessionClosedStatus;
}
- if (_nextMessage) {
- *message = *_nextMessage;
- }
+ OpMsgBuilder builder;
+ builder.setBody(BSON("ping" << 1));
+ *message = builder.finish();
return TransportLayerMock::sourceMessage(session, message, expiration);
}
@@ -154,9 +159,10 @@ public:
ASSERT_EQ(_ssm->state(),
_lastTicketSource ? ServiceStateMachine::State::SourceWait
: ServiceStateMachine::State::SinkWait);
- std::stringstream ss;
- ss << _ssm->state();
- log() << "In wait. ssm state: " << ss.str();
+
+ log() << "In wait. ssm state: " << stateToString(_ssm->state());
+ if (_waitHook)
+ _waitHook();
return TransportLayerMock::wait(std::move(ticket));
}
@@ -164,10 +170,6 @@ public:
MONGO_UNREACHABLE;
}
- void setNextMessage(Message&& message) {
- _nextMessage = std::move(message);
- }
-
void setSSM(ServiceStateMachine* ssm) {
_ssm = ssm;
}
@@ -190,14 +192,18 @@ public:
return _ranSource;
}
+ void setWaitHook(stdx::function<void()> hook) {
+ _waitHook = std::move(hook);
+ }
+
private:
bool _lastTicketSource = true;
bool _ranSink = false;
bool _ranSource = false;
- boost::optional<Message> _nextMessage;
FailureMode _nextShouldFail = Nothing;
Message _lastSunk;
ServiceStateMachine* _ssm;
+ stdx::function<void()> _waitHook;
};
Message buildRequest(BSONObj input) {
@@ -206,6 +212,61 @@ Message buildRequest(BSONObj input) {
return builder.finish();
}
+class MockServiceExecutor : public ServiceExecutor {
+public:
+ explicit MockServiceExecutor(ServiceContext* ctx) {}
+
+ using ScheduleHook = stdx::function<bool(Task)>;
+
+ Status start() override {
+ return Status::OK();
+ }
+ Status shutdown(Milliseconds timeout) override {
+ return Status::OK();
+ }
+ Status schedule(Task task, ScheduleFlags flags) override {
+ if (!_scheduleHook) {
+ return Status::OK();
+ } else {
+ return _scheduleHook(std::move(task)) ? Status::OK() : Status{ErrorCodes::InternalError,
+ "Hook returned error!"};
+ }
+ }
+
+ Mode transportMode() const override {
+ return Mode::kSynchronous;
+ }
+
+ void appendStats(BSONObjBuilder* bob) const override {}
+
+ void setScheduleHook(ScheduleHook hook) {
+ _scheduleHook = std::move(hook);
+ }
+
+private:
+ ScheduleHook _scheduleHook;
+};
+
+class SimpleEvent {
+public:
+ void signal() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _signaled = true;
+ _cond.notify_one();
+ }
+
+ void wait() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cond.wait(lk, [this] { return _signaled; });
+ _signaled = false;
+ }
+
+private:
+ stdx::mutex _mutex;
+ stdx::condition_variable _cond;
+ bool _signaled = false;
+};
+
using State = ServiceStateMachine::State;
class ServiceStateMachineFixture : public unittest::Test {
@@ -223,7 +284,9 @@ protected:
_sep = sep.get();
sc->setServiceEntryPoint(std::move(sep));
- sc->setServiceExecutor(stdx::make_unique<ServiceExecutorNoop>(sc));
+ auto se = stdx::make_unique<MockServiceExecutor>(sc);
+ _sexec = se.get();
+ sc->setServiceExecutor(std::move(se));
auto tl = stdx::make_unique<MockTL>();
_tl = tl.get();
@@ -236,7 +299,7 @@ protected:
}
void tearDown() override {
- getGlobalServiceContext()->getTransportLayer()->shutdown();
+ _tl->shutdown();
}
void runPingTest(State first, State second);
@@ -244,14 +307,13 @@ protected:
MockTL* _tl;
MockSEP* _sep;
+ MockServiceExecutor* _sexec;
SessionHandle _session;
std::shared_ptr<ServiceStateMachine> _ssm;
bool _ranHandler;
};
void ServiceStateMachineFixture::runPingTest(State first, State second) {
- _tl->setNextMessage(buildRequest(BSON("ping" << 1)));
-
ASSERT_FALSE(haveClient());
ASSERT_EQ(_ssm->state(), State::Created);
log() << "run next";
@@ -329,5 +391,187 @@ TEST_F(ServiceStateMachineFixture, TestSessionCleanupOnDestroy) {
ASSERT_TRUE(hookRan);
}
+// This tests that SSMs that fail to schedule their first task get cleaned up correctly.
+// (i.e. we couldn't create a worker thread after accept()).
+TEST_F(ServiceStateMachineFixture, ScheduleFailureDuringCreateCleanup) {
+ _sexec->setScheduleHook([](auto) { return false; });
+ // Set a cleanup hook so we know that the cleanup hook actually gets run when the session
+ // is destroyed
+ bool hookRan = false;
+ _ssm->setCleanupHook([&hookRan] { hookRan = true; });
+
+ _ssm->start(ServiceStateMachine::Ownership::kOwned);
+ ASSERT_EQ(State::Ended, _ssm->state());
+ ASSERT_EQ(_ssm.use_count(), 1);
+ ASSERT_TRUE(hookRan);
+}
+
+// This tests that calling terminate() actually ends and cleans up the SSM during all the
+// states.
+TEST_F(ServiceStateMachineFixture, TerminateWorksForAllStates) {
+ SimpleEvent hookRan, okayToContinue;
+
+ auto cleanupHook = [&hookRan] {
+ log() << "Cleaning up session";
+ hookRan.signal();
+ };
+
+ // This is a shared hook between the executor/TL that lets us notify the test that the SSM
+ // has reached a certain state and then gets terminated during that state.
+ State waitFor = State::Created;
+ SimpleEvent atDesiredState;
+ auto waitForHook = [this, &waitFor, &atDesiredState, &okayToContinue]() {
+ log() << "Checking for wakeup at " << stateToString(_ssm->state()) << ". Expecting "
+ << stateToString(waitFor);
+ if (_ssm->state() == waitFor) {
+ atDesiredState.signal();
+ okayToContinue.wait();
+ }
+ };
+
+ // This wraps the waitForHook so that schedules always succeed.
+ _sexec->setScheduleHook([waitForHook](auto) {
+ waitForHook();
+ return true;
+ });
+
+ // This just lets us intercept calls to _tl->wait() and terminate during them.
+ _tl->setWaitHook(waitForHook);
+
+ // Run this same test for each state.
+ auto states = {State::Source, State::SourceWait, State::Process, State::SinkWait};
+ for (const auto testState : states) {
+ log() << "Testing termination during " << stateToString(testState);
+
+ // Reset the _ssm to a fresh SSM and reset our tracking variables.
+ _ssm = ServiceStateMachine::create(
+ getGlobalServiceContext(), _tl->createSession(), transport::Mode::kSynchronous);
+ _tl->setSSM(_ssm.get());
+ _ssm->setCleanupHook(cleanupHook);
+
+ waitFor = testState;
+ // This is a dummy thread that just advances the SSM while we track its state/kill it
+ stdx::thread runner([ssm = _ssm] {
+ while (ssm->state() != State::Ended) {
+ ssm->runNext();
+ }
+ });
+
+ // Wait for the SSM to advance to the expected state
+ atDesiredState.wait();
+ log() << "Terminating session at " << stateToString(_ssm->state());
+
+ // Terminate the SSM
+ _ssm->terminate();
+
+ // Notify the waitForHook to continue and end the session
+ okayToContinue.signal();
+
+ // Wait for the SSM to terminate and the thread to end.
+ hookRan.wait();
+ runner.join();
+
+ // Verify that the SSM terminated and is in the correct state
+ ASSERT_EQ(State::Ended, _ssm->state());
+ ASSERT_EQ(_ssm.use_count(), 1);
+ }
+}
+
+// This tests that calling terminate() actually ends and cleans up the SSM during all states, and
+// with schedule() returning an error for each state.
+TEST_F(ServiceStateMachineFixture, TerminateWorksForAllStatesWithScheduleFailure) {
+ // Set a cleanup hook so we know that the cleanup hook actually gets run when the session
+ // is destroyed
+ SimpleEvent hookRan, okayToContinue;
+ bool scheduleFailed = false;
+
+ auto cleanupHook = [&hookRan] {
+ log() << "Cleaning up session";
+ hookRan.signal();
+ };
+
+ // This is a shared hook between the executor/TL that lets us notify the test that the SSM
+ // has reached a certain state and then gets terminated during that state.
+ State waitFor = State::Created;
+ SimpleEvent atDesiredState;
+ auto waitForHook = [this, &waitFor, &scheduleFailed, &okayToContinue, &atDesiredState]() {
+ log() << "Checking for wakeup at " << stateToString(_ssm->state()) << ". Expecting "
+ << stateToString(waitFor);
+ if (_ssm->state() == waitFor) {
+ atDesiredState.signal();
+ okayToContinue.wait();
+ scheduleFailed = true;
+ return false;
+ }
+ return true;
+ };
+
+ _sexec->setScheduleHook([waitForHook](auto) { return waitForHook(); });
+ // This wraps the waitForHook and discards its return status.
+ _tl->setWaitHook([waitForHook] { waitForHook(); });
+
+ auto states = {State::Source, State::SourceWait, State::Process, State::SinkWait};
+ for (const auto testState : states) {
+ log() << "Testing termination during " << stateToString(testState);
+ _ssm = ServiceStateMachine::create(
+ getGlobalServiceContext(), _tl->createSession(), transport::Mode::kSynchronous);
+ _tl->setSSM(_ssm.get());
+ scheduleFailed = false;
+ _ssm->setCleanupHook(cleanupHook);
+
+ waitFor = testState;
+ // This is a dummy thread that just advances the SSM while we track its state/kill it
+ stdx::thread runner([ ssm = _ssm, &scheduleFailed ] {
+ while (ssm->state() != State::Ended && !scheduleFailed) {
+ ssm->runNext();
+ }
+ });
+
+ // Wait for the SSM to advance to the expected state
+ atDesiredState.wait();
+ ASSERT_EQ(_ssm->state(), testState);
+ log() << "Terminating session at " << stateToString(_ssm->state());
+
+ // Terminate the SSM
+ _ssm->terminate();
+
+ // Notify the waitForHook to continue and end the session
+ okayToContinue.signal();
+ hookRan.wait();
+ runner.join();
+
+ // Verify that the SSM terminated and is in the correct state
+ ASSERT_EQ(State::Ended, _ssm->state());
+ ASSERT_EQ(_ssm.use_count(), 1);
+ }
+}
+
+// This makes sure that the SSM can run recursively by forcing the ServiceExecutor to run everything
+// recursively
+TEST_F(ServiceStateMachineFixture, SSMRunsRecursively) {
+ // This lets us force the SSM to only run once. After sinking the first response, the next call
+ // to sourceMessage will return with an error.
+ _tl->setWaitHook([this] {
+ if (_ssm->state() == State::SinkWait) {
+ _tl->setNextFailure();
+ }
+ });
+
+ // The scheduleHook just runs the task, effectively making this a recursive executor.
+ int recursionDepth = 0;
+ _sexec->setScheduleHook([&recursionDepth](auto task) {
+ log() << "running task in executor. depth: " << ++recursionDepth;
+ task();
+ return true;
+ });
+
+ _ssm->runNext();
+ // Check that the SSM actually ran, is ended, and actually ran recursively
+ ASSERT_EQ(recursionDepth, 2);
+ ASSERT_TRUE(_tl->ranSource());
+ ASSERT_TRUE(_tl->ranSink());
+ ASSERT_EQ(_ssm->state(), State::Ended);
+}
+
} // namespace
} // namespace mongo