From 1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Tue, 27 Jun 2017 14:04:53 -0400 Subject: SERVER-29402 Implement ServiceExecutor and fixed-size test executor --- src/mongo/transport/service_state_machine.h | 114 +++++++++++++++++++++++----- 1 file changed, 96 insertions(+), 18 deletions(-) (limited to 'src/mongo/transport/service_state_machine.h') diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index d1750a6eb98..e3212ad21b0 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -33,6 +33,8 @@ #include "mongo/base/status.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/session.h" @@ -40,25 +42,29 @@ namespace mongo { class ServiceEntryPoint; -namespace transport { -class ServiceExecutorBase; -} // namespace transport - /* * The ServiceStateMachine holds the state of a single client connection and represents the * lifecycle of each user request as a state machine. It is the glue between the stateless * ServiceEntryPoint and TransportLayer that ties network and database logic together for a * user. */ -class ServiceStateMachine { +class ServiceStateMachine : public std::enable_shared_from_this { ServiceStateMachine(ServiceStateMachine&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&) = delete; public: - ServiceStateMachine() = default; ServiceStateMachine(ServiceStateMachine&&) = default; ServiceStateMachine& operator=(ServiceStateMachine&&) = default; + /* + * Creates a new ServiceStateMachine for a given session/service context. If sync is true, + * then calls into the transport layer will block while they complete, otherwise they will + * be handled asynchronously. + */ + static std::shared_ptr create(ServiceContext* svcContext, + transport::SessionHandle session, + bool sync); + ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync); /* @@ -69,6 +75,7 @@ public: * Source -> SourceWait -> Process -> Source (fire-and-forget) */ enum class State { + Created, // The session has been created, but no operations have been performed yet Source, // Request a new Message from the network to handle SourceWait, // Wait for the new Message to arrive from the network Process, // Run the Message through the database @@ -85,6 +92,9 @@ public: * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack * if they have just completed a database operation to make sure that this doesn't infinitely * recurse. + * + * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take + * ownership of the SSM, it will call scheduleNext() and return immediately. */ void runNext(); @@ -100,16 +110,59 @@ public: /* * Gets the current state of connection for testing/diagnostic purposes. */ - State state() const { - return _state; - } + State state(); + + /* + * Terminates the associated transport Session, and requests that the next call to runNext + * should end the session. If the session has already ended, this does nothing. + */ + void terminate(); /* - * Explicitly ends the session. + * Sets a function to be called after the session is ended */ - void endSession(); + void setCleanupHook(stdx::function hook); + + /* + * Gets the transport::Session associated with this connection + */ + const transport::SessionHandle& session() const; private: + /* + * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + */ + class ThreadGuard; + friend class ThreadGuard; + + /* + * This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor + * while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for + * callbacks to run. + */ + template + void maybeScheduleFunc(Executor* svcExec, Func&& func) { + if (svcExec) { + uassertStatusOK(svcExec->schedule( + [ func = std::move(func), anchor = shared_from_this() ] { func(); })); + } + } + + template + void scheduleFunc(Func&& func) { + auto svcExec = _serviceContext->getServiceExecutor(); + invariant(svcExec); + maybeScheduleFunc(svcExec, func); + } + + /* + * This is the actual implementation of runNext() that gets called after the ThreadGuard + * has been successfully created. If any callbacks (like sourceCallback()) need to call + * runNext() and already own a ThreadGuard, they should call this with that guard as the + * argument. + */ + void runNextInGuard(ThreadGuard& guard); + /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. @@ -123,21 +176,20 @@ private: void sinkCallback(Status status); /* - * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + * Releases all the resources associated with the session and call the cleanupHook. */ - class ThreadGuard; - friend class ThreadGuard; - - const transport::SessionHandle& session() const; + void cleanupSession(); - State _state{State::Source}; + AtomicWord _state{State::Created}; ServiceEntryPoint* _sep; bool _sync; + ServiceContext* const _serviceContext; ServiceContext::UniqueClient _dbClient; const Client* _dbClientPtr; const std::string _threadName; + stdx::function _cleanupHook; bool inExhaust = false; bool wasCompressed = false; @@ -148,6 +200,32 @@ private: std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT }; -std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state); +template +T& operator<<(T& stream, const ServiceStateMachine::State& state) { + switch (state) { + case ServiceStateMachine::State::Created: + stream << "created"; + break; + case ServiceStateMachine::State::Source: + stream << "source"; + break; + case ServiceStateMachine::State::SourceWait: + stream << "sourceWait"; + break; + case ServiceStateMachine::State::Process: + stream << "process"; + break; + case ServiceStateMachine::State::SinkWait: + stream << "sinkWait"; + break; + case ServiceStateMachine::State::EndSession: + stream << "endSession"; + break; + case ServiceStateMachine::State::Ended: + stream << "ended"; + break; + } + return stream; +} } // namespace mongo -- cgit v1.2.1