summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-09-14 04:26:44 +0000
committerBen Caimano <ben.caimano@10gen.com>2020-09-17 02:08:00 +0000
commit5f3f8d97455eb1a599bf29d3ef2c981c3be0f265 (patch)
treea73832f509e0bad5c733772d6784288b18ca6c34
parentdabf0e0842dcedad2485cc19e65db262bb3172f6 (diff)
downloadmongo-5f3f8d97455eb1a599bf29d3ef2c981c3be0f265.tar.gz
SERVER-50947 Implize ServiceStateMachine
-rw-r--r--src/mongo/transport/service_state_machine.cpp348
-rw-r--r--src/mongo/transport/service_state_machine.h164
2 files changed, 257 insertions, 255 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index a526f252b20..6fde0057ed2 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -35,13 +35,19 @@
#include <memory>
+#include "mongo/base/status.h"
#include "mongo/config.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/traffic_recorder.h"
#include "mongo/logv2/log.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
#include "mongo/rpc/message.h"
#include "mongo/rpc/op_msg.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/message_compressor_base.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_fixed.h"
@@ -54,7 +60,9 @@
#include "mongo/util/debug_util.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_peer_info.h"
#include "mongo/util/quick_exit.h"
@@ -163,18 +171,168 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
}
} // namespace
+class ServiceStateMachine::Impl final
+ : public std::enable_shared_from_this<ServiceStateMachine::Impl> {
+public:
+ /*
+ * Any state may transition to EndSession in case of an error, otherwise the valid state
+ * transitions are:
+ * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC)
+ * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust)
+ * Source -> SourceWait -> Process -> Source (fire-and-forget)
+ */
+ enum class State {
+ Created, // The session has been created, but no operations have been performed yet
+ Source, // Request a new Message from the network to handle
+ SourceWait, // Wait for the new Message to arrive from the network
+ Process, // Run the Message through the database
+ SinkWait, // Wait for the database result to be sent by the network
+ EndSession, // End the session - the ServiceStateMachine will be invalid after this
+ Ended // The session has ended. It is illegal to call any method besides
+ // state() if this is the current state.
+ };
+
+ /*
+ * When start() is called with Ownership::kOwned, the SSM will swap the Client/thread name
+ * whenever it runs a stage of the state machine, and then unswap them out when leaving the SSM.
+ *
+ * With Ownership::kStatic, it will assume that the SSM will only ever be run from one thread,
+ * and that thread will not be used for other SSM's. It will swap in the Client/thread name for
+ * the first run and leave them in place.
+ *
+ * kUnowned is used internally to mark that the SSM is inactive.
+ */
+ enum class Ownership { kUnowned, kOwned, kStatic };
+
+ /*
+ * A class that wraps up lifetime management of the _client and _threadName for each step in
+ * runOnce();
+ */
+ class ThreadGuard;
+ class ThreadGuardedExecutor;
+
+ Impl(ServiceContext::UniqueClient client)
+ : _state{State::Created},
+ _serviceContext{client->getServiceContext()},
+ _sep{_serviceContext->getServiceEntryPoint()},
+ _client{std::move(client)},
+ _clientPtr{_client.get()} {}
+
+ void start(ServiceExecutorContext seCtx);
+
+ void setCleanupHook(std::function<void()> hook);
+
+ /*
+ * Terminates the associated transport Session, regardless of tags.
+ *
+ * This will not block on the session terminating cleaning itself up, it returns immediately.
+ */
+ void terminate();
+
+ /*
+ * Terminates the associated transport Session if its tags don't match the supplied tags. If
+ * the session is in a pending state, before any tags have been set, it will not be terminated.
+ *
+ * This will not block on the session terminating cleaning itself up, it returns immediately.
+ */
+ void terminateIfTagsDontMatch(transport::Session::TagMask tags);
+
+ /*
+ * Terminates the associated transport Session if status indicate error.
+ *
+ * This will not block on the session terminating cleaning itself up, it returns immediately.
+ */
+ void terminateAndLogIfError(Status status);
+
+ /*
+ * This function actually calls into the database and processes a request. It's broken out
+ * into its own inline function for better readability.
+ */
+ Future<void> processMessage();
+
+ /*
+ * These get called by the TransportLayer when requested network I/O has completed.
+ */
+ void sourceCallback(Status status);
+ void sinkCallback(Status status);
+
+ /*
+ * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
+ * before waiting on the TL.
+ */
+ Future<void> sourceMessage();
+ Future<void> sinkMessage();
+
+ /*
+ * Releases all the resources associated with the session and call the cleanupHook.
+ */
+ void cleanupSession();
+
+ /*
+ * This is the initial function called at the beginning of a thread's lifecycle in the
+ * TransportLayer.
+ */
+ void runOnce();
+
+ /*
+ * Releases all the resources associated with the exhaust request.
+ */
+ void cleanupExhaustResources() noexcept;
+
+ /*
+ * Gets the current state of connection for testing/diagnostic purposes.
+ */
+ State state() const {
+ return _state.load();
+ }
+
+ /*
+ * Gets the transport::Session associated with this connection
+ */
+ const transport::SessionHandle& session() {
+ return _clientPtr->session();
+ }
+
+ /*
+ * Gets the transport::ServiceExecutor associated with this connection.
+ */
+ ServiceExecutor* executor() {
+ return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor();
+ }
+
+private:
+ AtomicWord<State> _state{State::Created};
+
+ ServiceContext* const _serviceContext;
+ ServiceEntryPoint* const _sep;
+
+ transport::SessionHandle _sessionHandle;
+ ServiceContext::UniqueClient _client;
+ Client* _clientPtr;
+ std::function<void()> _cleanupHook;
+
+ bool _inExhaust = false;
+ boost::optional<MessageCompressorId> _compressorId;
+ Message _inMessage;
+ Message _outMessage;
+
+ // Allows delegating destruction of opCtx to another function to potentially remove its cost
+ // from the critical path. This is currently only used in `processMessage()`.
+ ServiceContext::UniqueOperationContext _killedOpCtx;
+};
+
/*
- * This class wraps up the logic for swapping/unswapping the Client when transitioning
- * between states.
+ * This class wraps up the logic for swapping/unswapping the Client when transitioning between
+ * states.
*
* In debug builds this also ensures that only one thread is working on the SSM at once.
*/
-class ServiceStateMachine::ThreadGuard {
+class ServiceStateMachine::Impl::ThreadGuard {
ThreadGuard(ThreadGuard&) = delete;
ThreadGuard& operator=(ThreadGuard&) = delete;
public:
- explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {
+ explicit ThreadGuard(ServiceStateMachine::Impl* ssm) : _ssm{ssm} {
invariant(_ssm);
if (_ssm->_clientPtr == Client::getCurrent()) {
@@ -250,14 +408,14 @@ public:
}
private:
- ServiceStateMachine* _ssm = nullptr;
+ ServiceStateMachine::Impl* _ssm = nullptr;
bool _haveTakenOwnership = false;
std::string _oldThreadName;
};
auto getThreadGuardedExecutor =
- Client::declareDecoration<std::shared_ptr<ServiceStateMachine::ThreadGuardedExecutor>>();
+ Client::declareDecoration<std::shared_ptr<ServiceStateMachine::Impl::ThreadGuardedExecutor>>();
/*
* A ThreadGuardedExecutor is a client decoration that can wrap any OutOfLineExecutor to allow
@@ -269,8 +427,8 @@ auto getThreadGuardedExecutor =
* pointer) to both ServiceStateMachine and ThreadGuardedExecutor, thus accessing these objects
* through raw pointers (e.g., `this`) is considered safe.
*/
-class ServiceStateMachine::ThreadGuardedExecutor
- : public std::enable_shared_from_this<ServiceStateMachine::ThreadGuardedExecutor> {
+class ServiceStateMachine::Impl::ThreadGuardedExecutor
+ : public std::enable_shared_from_this<ServiceStateMachine::Impl::ThreadGuardedExecutor> {
public:
// Wraps an instance of OutOfLineExecutor and delegates scheduling to ThreadGuardedExecutor
class WrappedExecutor : public OutOfLineExecutor,
@@ -294,7 +452,8 @@ public:
ThreadGuardedExecutor() = delete;
ThreadGuardedExecutor(const ThreadGuardedExecutor&) = delete;
- explicit ThreadGuardedExecutor(std::weak_ptr<ServiceStateMachine> ssm) : _ssm(std::move(ssm)) {}
+ explicit ThreadGuardedExecutor(std::weak_ptr<ServiceStateMachine::Impl> ssm)
+ : _ssm(std::move(ssm)) {}
ThreadGuardedExecutor(ThreadGuardedExecutor&& other)
: _isBusy{other._isBusy.load()}, _ssm(other._ssm), _guard(std::move(other._guard)) {}
@@ -337,7 +496,7 @@ public:
}
invariant(!_guard.has_value());
- _guard = ThreadGuard(ssm.get());
+ _guard.emplace(ssm.get());
LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Acquired ThreadGuard in scheduled task");
ON_BLOCK_EXIT([&] {
@@ -366,38 +525,23 @@ private:
// Set to `true` if the executor is busy running a task on behalf of the corresponding client.
AtomicWord<bool> _isBusy{false};
- std::weak_ptr<ServiceStateMachine> _ssm;
+ std::weak_ptr<ServiceStateMachine::Impl> _ssm;
boost::optional<ThreadGuard> _guard;
};
-ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)
- : _state{State::Created},
- _serviceContext{client->getServiceContext()},
- _sep{_serviceContext->getServiceEntryPoint()},
- _client{std::move(client)},
- _clientPtr{_client.get()} {}
-
-const transport::SessionHandle& ServiceStateMachine::_session() {
- return _clientPtr->session();
-}
-
-ServiceExecutor* ServiceStateMachine::_executor() {
- return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor();
-}
-
-Future<void> ServiceStateMachine::_sourceMessage() {
+Future<void> ServiceStateMachine::Impl::sourceMessage() {
invariant(_inMessage.empty());
invariant(_state.load() == State::Source);
_state.store(State::SourceWait);
auto sourceMsgImpl = [&] {
- const auto& transportMode = _executor()->transportMode();
+ const auto& transportMode = executor()->transportMode();
if (transportMode == transport::Mode::kSynchronous) {
MONGO_IDLE_THREAD_BLOCK;
- return Future<Message>::makeReady(_session()->sourceMessage());
+ return Future<Message>::makeReady(session()->sourceMessage());
} else {
invariant(transportMode == transport::Mode::kAsynchronous);
- return _session()->asyncSourceMessage();
+ return session()->asyncSourceMessage();
}
};
@@ -406,49 +550,49 @@ Future<void> ServiceStateMachine::_sourceMessage() {
_inMessage = std::move(msg.getValue());
invariant(!_inMessage.empty());
}
- _sourceCallback(msg.getStatus());
+ sourceCallback(msg.getStatus());
return Status::OK();
});
}
-Future<void> ServiceStateMachine::_sinkMessage() {
+Future<void> ServiceStateMachine::Impl::sinkMessage() {
// Sink our response to the client
invariant(_state.load() == State::Process);
_state.store(State::SinkWait);
auto toSink = std::exchange(_outMessage, {});
auto sinkMsgImpl = [&] {
- const auto& transportMode = _executor()->transportMode();
+ const auto& transportMode = executor()->transportMode();
if (transportMode == transport::Mode::kSynchronous) {
// We don't consider ourselves idle while sending the reply since we are still doing
// work on behalf of the client. Contrast that with sourceMessage() where we are waiting
// for the client to send us more work to do.
- return Future<void>::makeReady(_session()->sinkMessage(std::move(toSink)));
+ return Future<void>::makeReady(session()->sinkMessage(std::move(toSink)));
} else {
invariant(transportMode == transport::Mode::kAsynchronous);
- return _session()->asyncSinkMessage(std::move(toSink));
+ return session()->asyncSinkMessage(std::move(toSink));
}
};
return sinkMsgImpl().onCompletion([this](Status status) {
- _sinkCallback(std::move(status));
+ sinkCallback(std::move(status));
return Status::OK();
});
}
-void ServiceStateMachine::_sourceCallback(Status status) {
+void ServiceStateMachine::Impl::sourceCallback(Status status) {
auto guard = ThreadGuard(this);
invariant(state() == State::SourceWait);
- auto remote = _session()->remote();
+ auto remote = session()->remote();
if (status.isOK()) {
_state.store(State::Process);
- // If the sourceMessage succeeded then we can move to on to process the message. We
- // simply return from here and the future chain in _runOnce() will continue to the
- // next state normally.
+ // If the sourceMessage succeeded then we can move to on to process the message. We simply
+ // return from here and the future chain in runOnce() will continue to the next state
+ // normally.
// If any other issues arise, close the session.
} else if (ErrorCodes::isInterruption(status.code()) ||
@@ -473,13 +617,13 @@ void ServiceStateMachine::_sourceCallback(Status status) {
"Error receiving request from client. Ending connection from remote",
"error"_attr = status,
"remote"_attr = remote,
- "connectionId"_attr = _session()->id());
+ "connectionId"_attr = session()->id());
_state.store(State::EndSession);
}
uassertStatusOK(status);
}
-void ServiceStateMachine::_sinkCallback(Status status) {
+void ServiceStateMachine::Impl::sinkCallback(Status status) {
auto guard = ThreadGuard(this);
invariant(state() == State::SinkWait);
@@ -493,8 +637,8 @@ void ServiceStateMachine::_sinkCallback(Status status) {
LOGV2(22989,
"Error sending response to client. Ending connection from remote",
"error"_attr = status,
- "remote"_attr = _session()->remote(),
- "connectionId"_attr = _session()->id());
+ "remote"_attr = session()->remote(),
+ "connectionId"_attr = session()->id());
_state.store(State::EndSession);
uassertStatusOK(status);
} else if (_inExhaust) {
@@ -504,13 +648,13 @@ void ServiceStateMachine::_sinkCallback(Status status) {
}
}
-Future<void> ServiceStateMachine::_processMessage() {
+Future<void> ServiceStateMachine::Impl::processMessage() {
invariant(!_inMessage.empty());
TrafficRecorder::get(_serviceContext)
- .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage);
+ .observe(session(), _serviceContext->getPreciseClockSource()->now(), _inMessage);
- auto& compressorMgr = MessageCompressorManager::forSession(_session());
+ auto& compressorMgr = MessageCompressorManager::forSession(session());
_compressorId = boost::none;
if (_inMessage.operation() == dbCompressed) {
@@ -553,7 +697,7 @@ Future<void> ServiceStateMachine::_processMessage() {
toSink.header().setResponseToMsgId(_inMessage.header().getId());
if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
#ifdef MONGO_CONFIG_SSL
- if (!SSLPeerInfo::forSession(_session()).isTLS) {
+ if (!SSLPeerInfo::forSession(session()).isTLS) {
OpMsg::appendChecksum(&toSink);
}
#else
@@ -578,7 +722,7 @@ Future<void> ServiceStateMachine::_processMessage() {
}
TrafficRecorder::get(_serviceContext)
- .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink);
+ .observe(session(), _serviceContext->getPreciseClockSource()->now(), toSink);
_outMessage = std::move(toSink);
} else {
@@ -590,7 +734,7 @@ Future<void> ServiceStateMachine::_processMessage() {
});
}
-void ServiceStateMachine::start(ServiceExecutorContext seCtx) {
+void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) {
{
stdx::lock_guard lk(*_clientPtr);
@@ -602,34 +746,32 @@ void ServiceStateMachine::start(ServiceExecutorContext seCtx) {
ThreadGuardedExecutor::get(_clientPtr)
->schedule(
- _executor(),
+ executor(),
GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
// If this is the first run of the SSM, then update its state to Source
if (state() == State::Created) {
_state.store(State::Source);
}
- _runOnce();
+ runOnce();
}));
}
-void ServiceStateMachine::_runOnce() {
- makeReadyFutureWith([] {})
- .thenRunOn(ThreadGuardedExecutor::get(_clientPtr)->wrapExecutor(_executor()))
- .then([this]() -> Future<void> {
- if (_inExhaust) {
- return Status::OK();
- } else {
- return _sourceMessage();
- }
- })
- .then([this]() { return _processMessage(); })
+void ServiceStateMachine::Impl::runOnce() {
+ makeReadyFutureWith([&]() -> Future<void> {
+ if (_inExhaust) {
+ return Status::OK();
+ } else {
+ return sourceMessage();
+ }
+ })
+ .then([this]() { return processMessage(); })
.then([this]() -> Future<void> {
if (_outMessage.empty()) {
return Status::OK();
}
- return _sinkMessage();
+ return sinkMessage();
})
.getAsync([this](Status status) {
// Destroy the opCtx (already killed) here, to potentially use the delay between
@@ -639,9 +781,9 @@ void ServiceStateMachine::_runOnce() {
}
if (!status.isOK()) {
_state.store(State::EndSession);
- // The service executor failed to schedule the task. This could for example be
- // that we failed to start a worker thread. Terminate this connection to leave
- // the system in a valid state.
+ // The service executor failed to schedule the task. This could for example be that
+ // we failed to start a worker thread. Terminate this connection to leave the system
+ // in a valid state.
LOGV2_WARNING_OPTIONS(4910400,
{logv2::LogComponent::kExecutor},
"Terminating session due to error: {error}",
@@ -650,56 +792,46 @@ void ServiceStateMachine::_runOnce() {
terminate();
ThreadGuardedExecutor::get(_clientPtr)
- ->schedule(_executor(),
+ ->schedule(executor(),
GuaranteedExecutor::enforceRunOnce(
[this, anchor = shared_from_this()](Status status) {
- _cleanupSession();
+ cleanupSession();
}));
return;
}
ThreadGuardedExecutor::get(_clientPtr)
- ->schedule(_executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) {
- _runOnce();
+ ->schedule(executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) {
+ runOnce();
}));
});
}
-void ServiceStateMachine::terminate() {
+void ServiceStateMachine::Impl::terminate() {
if (state() == State::Ended)
return;
- _session()->end();
+ session()->end();
}
-void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
+void ServiceStateMachine::Impl::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
if (state() == State::Ended)
return;
- auto sessionTags = _session()->getTags();
+ auto sessionTags = session()->getTags();
- // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have
- // been set, then skip the termination check.
+ // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been
+ // set, then skip the termination check.
if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) {
- LOGV2(22991,
- "Skip closing connection for connection",
- "connectionId"_attr = _session()->id());
+ LOGV2(
+ 22991, "Skip closing connection for connection", "connectionId"_attr = session()->id());
return;
}
terminate();
}
-void ServiceStateMachine::setCleanupHook(std::function<void()> hook) {
- invariant(state() == State::Created);
- _cleanupHook = std::move(hook);
-}
-
-ServiceStateMachine::State ServiceStateMachine::state() {
- return _state.load();
-}
-
-void ServiceStateMachine::_terminateAndLogIfError(Status status) {
+void ServiceStateMachine::Impl::terminateAndLogIfError(Status status) {
if (!status.isOK()) {
LOGV2_WARNING_OPTIONS(22993,
{logv2::LogComponent::kExecutor},
@@ -710,7 +842,7 @@ void ServiceStateMachine::_terminateAndLogIfError(Status status) {
}
}
-void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
+void ServiceStateMachine::Impl::cleanupExhaustResources() noexcept try {
if (!_inExhaust) {
return;
}
@@ -720,8 +852,8 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
auto cursorId = request.body["getMore"].Long();
auto opCtx = Client::getCurrent()->makeOperationContext();
// Fire and forget. This is a best effort attempt to immediately clean up the exhaust
- // cursor. If the killCursors request fails here for any reasons, it will still be
- // cleaned up once the cursor times out.
+ // cursor. If the killCursors request fails here for any reasons, it will still be cleaned
+ // up once the cursor times out.
_sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get();
}
} catch (const DBException& e) {
@@ -731,14 +863,19 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
"error"_attr = e.toStatus());
}
-void ServiceStateMachine::_cleanupSession() {
+void ServiceStateMachine::Impl::setCleanupHook(std::function<void()> hook) {
+ invariant(state() == State::Created);
+ _cleanupHook = std::move(hook);
+}
+
+void ServiceStateMachine::Impl::cleanupSession() {
// Ensure the delayed destruction of opCtx always happens before doing the cleanup.
if (MONGO_likely(_killedOpCtx)) {
_killedOpCtx.reset();
}
invariant(!_killedOpCtx);
- _cleanupExhaustResources();
+ cleanupExhaustResources();
{
stdx::lock_guard lk(*_clientPtr);
@@ -756,5 +893,24 @@ void ServiceStateMachine::_cleanupSession() {
_outMessage.reset();
}
+ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)
+ : _impl{std::make_shared<Impl>(std::move(client))} {}
+
+void ServiceStateMachine::start(ServiceExecutorContext seCtx) {
+ _impl->start(std::move(seCtx));
+}
+
+void ServiceStateMachine::setCleanupHook(std::function<void()> hook) {
+ _impl->setCleanupHook(std::move(hook));
+}
+
+void ServiceStateMachine::terminate() {
+ _impl->terminate();
+}
+
+void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) {
+ _impl->terminateIfTagsDontMatch(tags);
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 1962715e81a..917ca6ca09e 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -29,24 +29,13 @@
#pragma once
-#include <atomic>
#include <functional>
#include <memory>
-#include "mongo/base/status.h"
-#include "mongo/config.h"
-#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/message_compressor_base.h"
-#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_mode.h"
-#include "mongo/util/future.h"
-#include "mongo/util/net/ssl_manager.h"
namespace mongo {
namespace transport {
@@ -57,62 +46,27 @@ namespace transport {
* ServiceEntryPoint and TransportLayer that ties network and database logic together for a
* user.
*/
-class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {
-public:
- class ThreadGuardedExecutor;
-
+class ServiceStateMachine {
ServiceStateMachine(ServiceStateMachine&) = delete;
ServiceStateMachine& operator=(ServiceStateMachine&) = delete;
ServiceStateMachine(ServiceStateMachine&&) = delete;
ServiceStateMachine& operator=(ServiceStateMachine&&) = delete;
+public:
+ class Impl;
+
/*
* Construct a ServiceStateMachine for a given Client.
*/
ServiceStateMachine(ServiceContext::UniqueClient client);
/*
- * Any state may transition to EndSession in case of an error, otherwise the valid state
- * transitions are:
- * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC)
- * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust)
- * Source -> SourceWait -> Process -> Source (fire-and-forget)
- */
- enum class State {
- Created, // The session has been created, but no operations have been performed yet
- Source, // Request a new Message from the network to handle
- SourceWait, // Wait for the new Message to arrive from the network
- Process, // Run the Message through the database
- SinkWait, // Wait for the database result to be sent by the network
- EndSession, // End the session - the ServiceStateMachine will be invalid after this
- Ended // The session has ended. It is illegal to call any method besides
- // state() if this is the current state.
- };
-
- /*
- * When start() is called with Ownership::kOwned, the SSM will swap the Client/thread name
- * whenever it runs a stage of the state machine, and then unswap them out when leaving the SSM.
- *
- * With Ownership::kStatic, it will assume that the SSM will only ever be run from one thread,
- * and that thread will not be used for other SSM's. It will swap in the Client/thread name
- * for the first run and leave them in place.
- *
- * kUnowned is used internally to mark that the SSM is inactive.
- */
- enum class Ownership { kUnowned, kOwned, kStatic };
-
- /*
* start() schedules a call to _runOnce() in the future.
*/
void start(ServiceExecutorContext seCtx);
/*
- * Gets the current state of connection for testing/diagnostic purposes.
- */
- State state();
-
- /*
* Terminates the associated transport Session, regardless of tags.
*
* This will not block on the session terminating cleaning itself up, it returns immediately.
@@ -134,116 +88,8 @@ public:
void setCleanupHook(std::function<void()> hook);
private:
- /*
- * A class that wraps up lifetime management of the _dbClient and _threadName for
- * each step in _runOnce();
- */
- class ThreadGuard;
-
- /*
- * Terminates the associated transport Session if status indicate error.
- *
- * This will not block on the session terminating cleaning itself up, it returns immediately.
- */
- void _terminateAndLogIfError(Status status);
-
- /*
- * Gets the transport::Session associated with this connection
- */
- const transport::SessionHandle& _session();
-
- /*
- * Gets the transport::ServiceExecutor associated with this connection.
- */
- ServiceExecutor* _executor();
-
- /*
- * This function actually calls into the database and processes a request. It's broken out
- * into its own inline function for better readability.
- */
- Future<void> _processMessage();
-
- /*
- * These get called by the TransportLayer when requested network I/O has completed.
- */
- void _sourceCallback(Status status);
- void _sinkCallback(Status status);
-
- /*
- * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
- * before waiting on the TL.
- */
- Future<void> _sourceMessage();
- Future<void> _sinkMessage();
-
- /*
- * Releases all the resources associated with the session and call the cleanupHook.
- */
- void _cleanupSession();
-
- /*
- * This is the initial function called at the beginning of a thread's lifecycle in the
- * TransportLayer.
- */
- void _runOnce();
-
- /*
- * Releases all the resources associated with the exhaust request.
- */
- void _cleanupExhaustResources() noexcept;
-
- AtomicWord<State> _state{State::Created};
-
- ServiceContext* const _serviceContext;
- ServiceEntryPoint* const _sep;
-
- transport::SessionHandle _sessionHandle;
- ServiceContext::UniqueClient _client;
- Client* _clientPtr;
- std::function<void()> _cleanupHook;
-
- bool _inExhaust = false;
- boost::optional<MessageCompressorId> _compressorId;
- Message _inMessage;
- Message _outMessage;
-
- // Allows delegating destruction of opCtx to another function to potentially remove its cost
- // from the critical path. This is currently only used in `_processMessage()`.
- ServiceContext::UniqueOperationContext _killedOpCtx;
-
- AtomicWord<Ownership> _owned{Ownership::kUnowned};
-#if MONGO_CONFIG_DEBUG_BUILD
- AtomicWord<stdx::thread::id> _owningThread;
-#endif
+ std::shared_ptr<Impl> _impl;
};
-template <typename T>
-T& operator<<(T& stream, const ServiceStateMachine::State& state) {
- switch (state) {
- case ServiceStateMachine::State::Created:
- stream << "created";
- break;
- case ServiceStateMachine::State::Source:
- stream << "source";
- break;
- case ServiceStateMachine::State::SourceWait:
- stream << "sourceWait";
- break;
- case ServiceStateMachine::State::Process:
- stream << "process";
- break;
- case ServiceStateMachine::State::SinkWait:
- stream << "sinkWait";
- break;
- case ServiceStateMachine::State::EndSession:
- stream << "endSession";
- break;
- case ServiceStateMachine::State::Ended:
- stream << "ended";
- break;
- }
- return stream;
-}
-
} // namespace transport
} // namespace mongo