diff options
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 74 |
1 files changed, 24 insertions, 50 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 0572c4f6ac8..7794cdfbfce 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/config.h" +#include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" @@ -44,9 +45,11 @@ #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" +#include "mongo/util/future.h" #include "mongo/util/net/ssl_manager.h" namespace mongo { +namespace transport { /* * The ServiceStateMachine holds the state of a single client connection and represents the @@ -63,17 +66,9 @@ public: ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; /* - * Creates a new ServiceStateMachine for a given session/service context. If sync is true, - * then calls into the transport layer will block while they complete, otherwise they will - * be handled asynchronously. + * Construct a ServiceStateMachine for a given Client. */ - static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, - transport::SessionHandle session, - transport::Mode transportMode); - - ServiceStateMachine(ServiceContext* svcContext, - transport::SessionHandle session, - transport::Mode transportMode); + ServiceStateMachine(ServiceContext::UniqueClient client); /* * Any state may transition to EndSession in case of an error, otherwise the valid state @@ -106,31 +101,9 @@ public: enum class Ownership { kUnowned, kOwned, kStatic }; /* - * runNext() will run the current state of the state machine. It also handles all the error - * handling and state management for requests. - * - * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack - * if they have just completed a database operation to make sure that this doesn't infinitely - * recurse. - * - * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take - * ownership of the SSM, it will call scheduleNext() and return immediately. - */ - void runNext(); - - /* - * start() schedules a call to runNext() in the future. - * - * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not - * guaranteed that runNext() will run after this return - */ - void start(Ownership ownershipModel); - - /* - * Set the executor to be used for the next call to runNext(). This allows switching between - * thread models after the SSM has started. + * start() schedules a call to _runOnce() in the future. */ - void setServiceExecutor(transport::ServiceExecutor* executor); + void start(); /* * Gets the current state of connection for testing/diagnostic purposes. @@ -160,7 +133,8 @@ public: private: /* - * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + * A class that wraps up lifetime management of the _dbClient and _threadName for + * each step in _runOnce(); */ class ThreadGuard; friend class ThreadGuard; @@ -188,18 +162,15 @@ private: const transport::SessionHandle& _session() const; /* - * This is the actual implementation of runNext() that gets called after the ThreadGuard - * has been successfully created. If any callbacks (like sourceCallback()) need to call - * runNext() and already own a ThreadGuard, they should call this with that guard as the - * argument. + * Gets the transport::ServiceExecutor associated with this connection. */ - void _runNextInGuard(ThreadGuard guard); + ServiceExecutor* _executor(); /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ - inline void _processMessage(ThreadGuard guard); + Future<void> _processMessage(ThreadGuard guard); /* * These get called by the TransportLayer when requested network I/O has completed. @@ -211,8 +182,8 @@ private: * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just * before waiting on the TL. */ - void _sourceMessage(ThreadGuard guard); - void _sinkMessage(ThreadGuard guard, Message toSink); + Future<void> _sourceMessage(ThreadGuard guard); + Future<void> _sinkMessage(ThreadGuard guard); /* * Releases all the resources associated with the session and call the cleanupHook. @@ -220,27 +191,30 @@ private: void _cleanupSession(ThreadGuard guard); /* + * This is the initial function called at the beginning of a thread's lifecycle in the + * TransportLayer. + */ + void _runOnce(); + + /* * Releases all the resources associated with the exhaust request. */ void _cleanupExhaustResources() noexcept; AtomicWord<State> _state{State::Created}; - ServiceEntryPoint* _sep; - transport::Mode _transportMode; - ServiceContext* const _serviceContext; - transport::ServiceExecutor* _serviceExecutor; + ServiceEntryPoint* const _sep; transport::SessionHandle _sessionHandle; - const std::string _threadName; ServiceContext::UniqueClient _dbClient; - const Client* _dbClientPtr; + Client* _dbClientPtr; std::function<void()> _cleanupHook; bool _inExhaust = false; boost::optional<MessageCompressorId> _compressorId; Message _inMessage; + Message _outMessage; // Allows delegating destruction of opCtx to another function to potentially remove its cost // from the critical path. This is currently only used in `_processMessage()`. @@ -250,7 +224,6 @@ private: #if MONGO_CONFIG_DEBUG_BUILD AtomicWord<stdx::thread::id> _owningThread; #endif - std::string _oldThreadName; }; template <typename T> @@ -281,4 +254,5 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) { return stream; } +} // namespace transport } // namespace mongo |