diff options
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 74 |
1 files changed, 50 insertions, 24 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 7794cdfbfce..0572c4f6ac8 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -35,7 +35,6 @@ #include "mongo/base/status.h" #include "mongo/config.h" -#include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" @@ -45,11 +44,9 @@ #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" -#include "mongo/util/future.h" #include "mongo/util/net/ssl_manager.h" namespace mongo { -namespace transport { /* * The ServiceStateMachine holds the state of a single client connection and represents the @@ -66,9 +63,17 @@ public: ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; /* - * Construct a ServiceStateMachine for a given Client. + * Creates a new ServiceStateMachine for a given session/service context. If sync is true, + * then calls into the transport layer will block while they complete, otherwise they will + * be handled asynchronously. */ - ServiceStateMachine(ServiceContext::UniqueClient client); + static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); + + ServiceStateMachine(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); /* * Any state may transition to EndSession in case of an error, otherwise the valid state @@ -101,9 +106,31 @@ public: enum class Ownership { kUnowned, kOwned, kStatic }; /* - * start() schedules a call to _runOnce() in the future. + * runNext() will run the current state of the state machine. It also handles all the error + * handling and state management for requests. + * + * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack + * if they have just completed a database operation to make sure that this doesn't infinitely + * recurse. + * + * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take + * ownership of the SSM, it will call scheduleNext() and return immediately. + */ + void runNext(); + + /* + * start() schedules a call to runNext() in the future. + * + * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not + * guaranteed that runNext() will run after this return + */ + void start(Ownership ownershipModel); + + /* + * Set the executor to be used for the next call to runNext(). This allows switching between + * thread models after the SSM has started. */ - void start(); + void setServiceExecutor(transport::ServiceExecutor* executor); /* * Gets the current state of connection for testing/diagnostic purposes. @@ -133,8 +160,7 @@ public: private: /* - * A class that wraps up lifetime management of the _dbClient and _threadName for - * each step in _runOnce(); + * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); */ class ThreadGuard; friend class ThreadGuard; @@ -162,15 +188,18 @@ private: const transport::SessionHandle& _session() const; /* - * Gets the transport::ServiceExecutor associated with this connection. + * This is the actual implementation of runNext() that gets called after the ThreadGuard + * has been successfully created. If any callbacks (like sourceCallback()) need to call + * runNext() and already own a ThreadGuard, they should call this with that guard as the + * argument. */ - ServiceExecutor* _executor(); + void _runNextInGuard(ThreadGuard guard); /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ - Future<void> _processMessage(ThreadGuard guard); + inline void _processMessage(ThreadGuard guard); /* * These get called by the TransportLayer when requested network I/O has completed. @@ -182,8 +211,8 @@ private: * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just * before waiting on the TL. */ - Future<void> _sourceMessage(ThreadGuard guard); - Future<void> _sinkMessage(ThreadGuard guard); + void _sourceMessage(ThreadGuard guard); + void _sinkMessage(ThreadGuard guard, Message toSink); /* * Releases all the resources associated with the session and call the cleanupHook. @@ -191,30 +220,27 @@ private: void _cleanupSession(ThreadGuard guard); /* - * This is the initial function called at the beginning of a thread's lifecycle in the - * TransportLayer. - */ - void _runOnce(); - - /* * Releases all the resources associated with the exhaust request. */ void _cleanupExhaustResources() noexcept; AtomicWord<State> _state{State::Created}; + ServiceEntryPoint* _sep; + transport::Mode _transportMode; + ServiceContext* const _serviceContext; - ServiceEntryPoint* const _sep; + transport::ServiceExecutor* _serviceExecutor; transport::SessionHandle _sessionHandle; + const std::string _threadName; ServiceContext::UniqueClient _dbClient; - Client* _dbClientPtr; + const Client* _dbClientPtr; std::function<void()> _cleanupHook; bool _inExhaust = false; boost::optional<MessageCompressorId> _compressorId; Message _inMessage; - Message _outMessage; // Allows delegating destruction of opCtx to another function to potentially remove its cost // from the critical path. This is currently only used in `_processMessage()`. @@ -224,6 +250,7 @@ private: #if MONGO_CONFIG_DEBUG_BUILD AtomicWord<stdx::thread::id> _owningThread; #endif + std::string _oldThreadName; }; template <typename T> @@ -254,5 +281,4 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) { return stream; } -} // namespace transport } // namespace mongo |