summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_state_machine.cpp')
-rw-r--r--src/mongo/transport/service_state_machine.cpp308
1 files changed, 180 insertions, 128 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index d755a5f4c61..624b22dd1d8 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -51,7 +51,6 @@
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/thread_idle_callback.h"
#include "mongo/util/quick_exit.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -91,12 +90,98 @@ bool setExhaustMessage(Message* m, const DbResponse& dbresponse) {
} // namespace
using transport::TransportLayer;
+
+/*
+ * This class wraps up the logic for swapping/unswapping the Client during runNext().
+ */
+class ServiceStateMachine::ThreadGuard {
+ ThreadGuard(ThreadGuard&) = delete;
+ ThreadGuard& operator=(ThreadGuard&) = delete;
+
+public:
+ explicit ThreadGuard(ServiceStateMachine* ssm)
+ : _ssm{ssm},
+ _haveTakenOwnership{!_ssm->_isOwned.test_and_set()},
+ _oldThreadName{getThreadName().toString()} {
+ const auto currentOwningThread = _ssm->_currentOwningThread.load();
+ const auto currentThreadId = stdx::this_thread::get_id();
+
+ // If this is true, then we are the "owner" of the Client and we should swap the
+ // client/thread name before doing any work.
+ if (_haveTakenOwnership) {
+ _ssm->_currentOwningThread.store(currentThreadId);
+
+ // Set up the thread name
+ setThreadName(_ssm->_threadName);
+
+ // These are sanity checks to make sure that the Client is what we expect it to be
+ invariant(!haveClient());
+ invariant(_ssm->_dbClient.get() == _ssm->_dbClientPtr);
+
+ // Swap the current Client so calls to cc() work as expected
+ Client::setCurrent(std::move(_ssm->_dbClient));
+ } else if (currentOwningThread != currentThreadId) {
+ // If the currentOwningThread does not equal the currentThreadId, then another thread
+ // currently "owns" the Client and we should reschedule ourself.
+ _okayToRunNext = false;
+ }
+ }
+
+ ~ThreadGuard() {
+ // If we are not the owner of the SSM, then do nothing. Something higher up the call stack
+ // will have to clean up.
+ if (!_haveTakenOwnership)
+ return;
+
+ // If the session has ended, then assume that it's unsafe to do anything but call the
+ // cleanup hook.
+ if (_ssm->state() == State::Ended) {
+ // The cleanup hook may change as soon as we unlock the mutex, so move it out of the
+ // ssm before unlocking the lock.
+ auto cleanupHook = std::move(_ssm->_cleanupHook);
+ if (cleanupHook)
+ cleanupHook();
+
+ return;
+ }
+
+ // Otherwise swap thread locals and thread names back into the SSM so its ready for the
+ // next run.
+ if (haveClient()) {
+ _ssm->_dbClient = Client::releaseCurrent();
+ }
+ setThreadName(_oldThreadName);
+ _ssm->_isOwned.clear();
+ }
+
+ // This bool operator reflects whether the ThreadGuard was able to take ownership of the thread
+ // either higher up the call chain, or in this call. If this returns false, then it is not safe
+ // to assume the thread has been setup correctly, or that any mutable state of the SSM is safe
+ // to access except for the current _state value.
+ explicit operator bool() const {
+ return _okayToRunNext;
+ }
+
+private:
+ ServiceStateMachine* _ssm;
+ bool _haveTakenOwnership;
+ const std::string _oldThreadName;
+ bool _okayToRunNext = true;
+};
+
+std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ bool sync) {
+ return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), sync);
+}
+
ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
transport::SessionHandle session,
bool sync)
- : _state{State::Source},
+ : _state{State::Created},
_sep{svcContext->getServiceEntryPoint()},
_sync(sync),
+ _serviceContext(svcContext),
_dbClient{svcContext->makeClient("conn", std::move(session))},
_dbClientPtr{_dbClient.get()},
_threadName{str::stream() << "conn" << _dbClient->session()->id()},
@@ -108,55 +193,74 @@ const transport::SessionHandle& ServiceStateMachine::session() const {
}
void ServiceStateMachine::sourceCallback(Status status) {
+ // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
+ // thread.
+ ThreadGuard guard(this);
+ // If the guard wasn't able to take ownership of the thread, then reschedule this call to
+ // runNext() so that this thread can do other useful work with its timeslice instead of going
+ // to sleep while waiting for the SSM to be released.
+ if (!guard) {
+ return scheduleFunc([this, status] { sourceCallback(status); });
+ }
+
// Make sure we just called sourceMessage();
- invariant(_state == State::SourceWait);
+ invariant(state() == State::SourceWait);
auto remote = session()->remote();
if (status.isOK()) {
- _state = State::Process;
+ _state.store(State::Process);
} else if (ErrorCodes::isInterruption(status.code()) ||
ErrorCodes::isNetworkError(status.code())) {
LOG(2) << "Session from " << remote << " encountered a network error during SourceMessage";
- _state = State::EndSession;
+ _state.store(State::EndSession);
} else if (status == TransportLayer::TicketSessionClosedStatus) {
// Our session may have been closed internally.
LOG(2) << "Session from " << remote << " was closed internally during SourceMessage";
- _state = State::EndSession;
+ _state.store(State::EndSession);
} else {
log() << "Error receiving request from client: " << status << ". Ending connection from "
<< remote << " (connection id: " << session()->id() << ")";
- _state = State::EndSession;
+ _state.store(State::EndSession);
}
- // In asyncronous mode this is the entrypoint back into the database from the network layer
- // after a message has been received, so we want to call runNext() to process the message.
- //
- // In synchronous mode, runNext() will fall through to call processMessage() so we avoid
- // the recursive call.
- if (!_sync)
- return runNext();
+ runNextInGuard(guard);
}
void ServiceStateMachine::sinkCallback(Status status) {
- invariant(_state == State::SinkWait);
+ // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
+ // thread.
+ ThreadGuard guard(this);
+ // If the guard wasn't able to take ownership of the thread, then reschedule this call to
+ // runNext() so that this thread can do other useful work with its timeslice instead of going
+ // to sleep while waiting for the SSM to be released.
+ if (!guard) {
+ return scheduleFunc([this, status] { sinkCallback(status); });
+ }
+
+ invariant(state() == State::SinkWait);
if (!status.isOK()) {
log() << "Error sending response to client: " << status << ". Ending connection from "
<< session()->remote() << " (connection id: " << session()->id() << ")";
- _state = State::EndSession;
+ _state.store(State::EndSession);
} else if (inExhaust) {
- _state = State::Process;
+ _state.store(State::Process);
} else {
- _state = State::Source;
+ _state.store(State::Source);
}
- return scheduleNext();
+ // If the session ended, then runNext to clean it up
+ if (state() == State::EndSession) {
+ runNextInGuard(guard);
+ } else { // Otherwise scheduleNext to unwind the stack and run the next step later
+ scheduleNext();
+ }
}
void ServiceStateMachine::processMessage() {
// This may have been called just after a failure to source a message, in which case this
// should return early so the session can be cleaned up.
- if (_state != State::Process) {
+ if (state() != State::Process) {
return;
}
invariant(!_inMessage.empty());
@@ -174,7 +278,7 @@ void ServiceStateMachine::processMessage() {
networkCounter.hitLogicalIn(_inMessage.size());
- // 2. Pass sourced Message to handler to generate response.
+ // Pass sourced Message to handler to generate response.
auto opCtx = cc().makeOperationContext();
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
@@ -185,7 +289,7 @@ void ServiceStateMachine::processMessage() {
// up in currentOp results after the response reaches the client
opCtx.reset();
- // 3. Format our response, if we have one
+ // Format our response, if we have one
Message& toSink = dbresponse.response;
if (!toSink.empty()) {
toSink.header().setId(nextMessageId());
@@ -207,9 +311,10 @@ void ServiceStateMachine::processMessage() {
toSink = swm.getValue();
}
- // 4. Sink our response to the client
+ // Sink our response to the client
auto ticket = session()->sinkMessage(toSink);
- _state = State::SinkWait;
+
+ _state.store(State::SinkWait);
if (_sync) {
sinkCallback(session()->getTransportLayer()->wait(std::move(ticket)));
} else {
@@ -217,88 +322,44 @@ void ServiceStateMachine::processMessage() {
std::move(ticket), [this](Status status) { sinkCallback(status); });
}
} else {
- _state = State::Source;
+ _state.store(State::Source);
_inMessage.reset();
return scheduleNext();
}
}
-/*
- * This class wraps up the logic for swapping/unswapping the Client during runNext().
- */
-class ServiceStateMachine::ThreadGuard {
- ThreadGuard(ThreadGuard&) = delete;
- ThreadGuard& operator=(ThreadGuard&) = delete;
-
-public:
- explicit ThreadGuard(ServiceStateMachine* ssm)
- : _ssm{ssm},
- _haveTakenOwnership{!_ssm->_isOwned.test_and_set()},
- _oldThreadName{getThreadName().toString()} {
- const auto currentOwningThread = _ssm->_currentOwningThread.load();
- const auto currentThreadId = stdx::this_thread::get_id();
-
- // If this is true, then we are the "owner" of the Client and we should swap the
- // client/thread name before doing any work.
- if (_haveTakenOwnership) {
- _ssm->_currentOwningThread.store(currentThreadId);
-
- // Set up the thread name
- setThreadName(_ssm->_threadName);
-
- // These are sanity checks to make sure that the Client is what we expect it to be
- invariant(!haveClient());
- invariant(_ssm->_dbClient.get() == _ssm->_dbClientPtr);
-
- // Swap the current Client so calls to cc() work as expected
- Client::setCurrent(std::move(_ssm->_dbClient));
- } else if (currentOwningThread != currentThreadId) {
- // If the currentOwningThread does not equal the currentThreadId, then another thread
- // currently "owns" the Client and we should reschedule ourself.
- _okayToRunNext = false;
- }
- }
-
- ~ThreadGuard() {
- if (!_haveTakenOwnership)
- return;
-
- if (haveClient()) {
- _ssm->_dbClient = Client::releaseCurrent();
- }
- setThreadName(_oldThreadName);
- _ssm->_isOwned.clear();
+void ServiceStateMachine::runNext() {
+ // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
+ // thread.
+ ThreadGuard guard(this);
+ // If the guard wasn't able to take ownership of the thread, then reschedule this call to
+ // runNext() so that this thread can do other useful work with its timeslice instead of going
+ // to sleep while waiting for the SSM to be released.
+ if (!guard) {
+ return scheduleNext();
}
+ return runNextInGuard(guard);
+}
- void dismiss() {
- _haveTakenOwnership = false;
- }
+void ServiceStateMachine::runNextInGuard(ThreadGuard& guard) {
+ auto curState = state();
+ invariant(curState != State::Ended);
- explicit operator bool() const {
- return _okayToRunNext;
+ // If this is the first run of the SSM, then update its state to Source
+ if (curState == State::Created) {
+ curState = State::Source;
+ _state.store(curState);
}
-private:
- ServiceStateMachine* _ssm;
- bool _haveTakenOwnership;
- const std::string _oldThreadName;
- bool _okayToRunNext = true;
-};
-
-void ServiceStateMachine::runNext() {
- ThreadGuard guard(this);
- if (!guard)
- return scheduleNext();
-
// Make sure the current Client got set correctly
invariant(Client::getCurrent() == _dbClientPtr);
try {
- switch (_state) {
+ switch (curState) {
case State::Source: {
invariant(_inMessage.empty());
auto ticket = session()->sourceMessage(&_inMessage);
- _state = State::SourceWait;
+ _state.store(State::SourceWait);
if (_sync) {
MONGO_IDLE_THREAD_BLOCK;
sourceCallback(session()->getTransportLayer()->wait(std::move(ticket)));
@@ -319,14 +380,14 @@ void ServiceStateMachine::runNext() {
MONGO_UNREACHABLE;
}
- if (_state == State::EndSession) {
- guard.dismiss();
- endSession();
- }
-
if ((_counter++ & 0xf) == 0) {
markThreadIdle();
- };
+ }
+
+ if (state() == State::EndSession) {
+ cleanupSession();
+ }
+
return;
} catch (const AssertionException& e) {
log() << "AssertionException handling request, closing client connection: " << e;
@@ -340,16 +401,33 @@ void ServiceStateMachine::runNext() {
quickExit(EXIT_UNCAUGHT);
}
- _state = State::EndSession;
- guard.dismiss();
- endSession();
+ _state.store(State::EndSession);
+ cleanupSession();
}
-// TODO: Right now this is a noop because we only run in synchronous mode. When an async
-// TransportLayer is written, this will call the serviceexecutor to schedule calls to runNext().
-void ServiceStateMachine::scheduleNext() {}
+void ServiceStateMachine::scheduleNext() {
+ maybeScheduleFunc(_serviceContext->getServiceExecutor(), [this] { runNext(); });
+}
+
+void ServiceStateMachine::terminate() {
+ if (state() == State::Ended)
+ return;
+ auto tl = session()->getTransportLayer();
+ tl->end(session());
+}
+
+void ServiceStateMachine::setCleanupHook(stdx::function<void()> hook) {
+ invariant(state() == State::Created);
+ _cleanupHook = std::move(hook);
+}
+
+ServiceStateMachine::State ServiceStateMachine::state() {
+ return _state.load();
+}
+
+void ServiceStateMachine::cleanupSession() {
+ _state.store(State::Ended);
-void ServiceStateMachine::endSession() {
auto tl = session()->getTransportLayer();
_inMessage.reset();
@@ -362,32 +440,6 @@ void ServiceStateMachine::endSession() {
const char* word = (conns == 1 ? " connection" : " connections");
log() << "end connection " << remote << " (" << conns << word << " now open)";
}
-
- _state = State::Ended;
-}
-
-std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state) {
- switch (state) {
- case ServiceStateMachine::State::Source:
- stream << "source";
- break;
- case ServiceStateMachine::State::SourceWait:
- stream << "sourceWait";
- break;
- case ServiceStateMachine::State::Process:
- stream << "process";
- break;
- case ServiceStateMachine::State::SinkWait:
- stream << "sinkWait";
- break;
- case ServiceStateMachine::State::EndSession:
- stream << "endSession";
- break;
- case ServiceStateMachine::State::Ended:
- stream << "ended";
- break;
- }
- return stream;
}
} // namespace mongo