diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2017-06-27 14:04:53 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2017-07-14 16:19:40 -0400 |
commit | 1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69 (patch) | |
tree | 4cad8d64f1ace9bc13aea786b460872b1ce466c3 /src/mongo/transport/service_state_machine.h | |
parent | e0b06a9da3c0c6071f4e636f3c3ba3e8851c5db0 (diff) | |
download | mongo-1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69.tar.gz |
SERVER-29402 Implement ServiceExecutor and fixed-size test executor
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 114 |
1 files changed, 96 insertions, 18 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index d1750a6eb98..e3212ad21b0 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -33,6 +33,8 @@ #include "mongo/base/status.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/session.h" @@ -40,25 +42,29 @@ namespace mongo { class ServiceEntryPoint; -namespace transport { -class ServiceExecutorBase; -} // namespace transport - /* * The ServiceStateMachine holds the state of a single client connection and represents the * lifecycle of each user request as a state machine. It is the glue between the stateless * ServiceEntryPoint and TransportLayer that ties network and database logic together for a * user. */ -class ServiceStateMachine { +class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> { ServiceStateMachine(ServiceStateMachine&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&) = delete; public: - ServiceStateMachine() = default; ServiceStateMachine(ServiceStateMachine&&) = default; ServiceStateMachine& operator=(ServiceStateMachine&&) = default; + /* + * Creates a new ServiceStateMachine for a given session/service context. If sync is true, + * then calls into the transport layer will block while they complete, otherwise they will + * be handled asynchronously. + */ + static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, + transport::SessionHandle session, + bool sync); + ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync); /* @@ -69,6 +75,7 @@ public: * Source -> SourceWait -> Process -> Source (fire-and-forget) */ enum class State { + Created, // The session has been created, but no operations have been performed yet Source, // Request a new Message from the network to handle SourceWait, // Wait for the new Message to arrive from the network Process, // Run the Message through the database @@ -85,6 +92,9 @@ public: * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack * if they have just completed a database operation to make sure that this doesn't infinitely * recurse. + * + * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take + * ownership of the SSM, it will call scheduleNext() and return immediately. */ void runNext(); @@ -100,17 +110,60 @@ public: /* * Gets the current state of connection for testing/diagnostic purposes. */ - State state() const { - return _state; - } + State state(); + + /* + * Terminates the associated transport Session, and requests that the next call to runNext + * should end the session. If the session has already ended, this does nothing. + */ + void terminate(); /* - * Explicitly ends the session. + * Sets a function to be called after the session is ended */ - void endSession(); + void setCleanupHook(stdx::function<void()> hook); + + /* + * Gets the transport::Session associated with this connection + */ + const transport::SessionHandle& session() const; private: /* + * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + */ + class ThreadGuard; + friend class ThreadGuard; + + /* + * This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor + * while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for + * callbacks to run. + */ + template <typename Executor, typename Func> + void maybeScheduleFunc(Executor* svcExec, Func&& func) { + if (svcExec) { + uassertStatusOK(svcExec->schedule( + [ func = std::move(func), anchor = shared_from_this() ] { func(); })); + } + } + + template <typename Func> + void scheduleFunc(Func&& func) { + auto svcExec = _serviceContext->getServiceExecutor(); + invariant(svcExec); + maybeScheduleFunc(svcExec, func); + } + + /* + * This is the actual implementation of runNext() that gets called after the ThreadGuard + * has been successfully created. If any callbacks (like sourceCallback()) need to call + * runNext() and already own a ThreadGuard, they should call this with that guard as the + * argument. + */ + void runNextInGuard(ThreadGuard& guard); + + /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ @@ -123,21 +176,20 @@ private: void sinkCallback(Status status); /* - * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + * Releases all the resources associated with the session and call the cleanupHook. */ - class ThreadGuard; - friend class ThreadGuard; - - const transport::SessionHandle& session() const; + void cleanupSession(); - State _state{State::Source}; + AtomicWord<State> _state{State::Created}; ServiceEntryPoint* _sep; bool _sync; + ServiceContext* const _serviceContext; ServiceContext::UniqueClient _dbClient; const Client* _dbClientPtr; const std::string _threadName; + stdx::function<void()> _cleanupHook; bool inExhaust = false; bool wasCompressed = false; @@ -148,6 +200,32 @@ private: std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT }; -std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state); +template <typename T> +T& operator<<(T& stream, const ServiceStateMachine::State& state) { + switch (state) { + case ServiceStateMachine::State::Created: + stream << "created"; + break; + case ServiceStateMachine::State::Source: + stream << "source"; + break; + case ServiceStateMachine::State::SourceWait: + stream << "sourceWait"; + break; + case ServiceStateMachine::State::Process: + stream << "process"; + break; + case ServiceStateMachine::State::SinkWait: + stream << "sinkWait"; + break; + case ServiceStateMachine::State::EndSession: + stream << "endSession"; + break; + case ServiceStateMachine::State::Ended: + stream << "ended"; + break; + } + return stream; +} } // namespace mongo |