summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.h
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2017-06-27 14:04:53 -0400
committerJonathan Reams <jbreams@mongodb.com>2017-07-14 16:19:40 -0400
commit1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69 (patch)
tree4cad8d64f1ace9bc13aea786b460872b1ce466c3 /src/mongo/transport/service_state_machine.h
parente0b06a9da3c0c6071f4e636f3c3ba3e8851c5db0 (diff)
downloadmongo-1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69.tar.gz
SERVER-29402 Implement ServiceExecutor and fixed-size test executor
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r--src/mongo/transport/service_state_machine.h114
1 files changed, 96 insertions, 18 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index d1750a6eb98..e3212ad21b0 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -33,6 +33,8 @@
#include "mongo/base/status.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/session.h"
@@ -40,25 +42,29 @@
namespace mongo {
class ServiceEntryPoint;
-namespace transport {
-class ServiceExecutorBase;
-} // namespace transport
-
/*
* The ServiceStateMachine holds the state of a single client connection and represents the
* lifecycle of each user request as a state machine. It is the glue between the stateless
* ServiceEntryPoint and TransportLayer that ties network and database logic together for a
* user.
*/
-class ServiceStateMachine {
+class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {
ServiceStateMachine(ServiceStateMachine&) = delete;
ServiceStateMachine& operator=(ServiceStateMachine&) = delete;
public:
- ServiceStateMachine() = default;
ServiceStateMachine(ServiceStateMachine&&) = default;
ServiceStateMachine& operator=(ServiceStateMachine&&) = default;
+ /*
+ * Creates a new ServiceStateMachine for a given session/service context. If sync is true,
+ * then calls into the transport layer will block while they complete, otherwise they will
+ * be handled asynchronously.
+ */
+ static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ bool sync);
+
ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync);
/*
@@ -69,6 +75,7 @@ public:
* Source -> SourceWait -> Process -> Source (fire-and-forget)
*/
enum class State {
+ Created, // The session has been created, but no operations have been performed yet
Source, // Request a new Message from the network to handle
SourceWait, // Wait for the new Message to arrive from the network
Process, // Run the Message through the database
@@ -85,6 +92,9 @@ public:
* Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack
* if they have just completed a database operation to make sure that this doesn't infinitely
* recurse.
+ *
+ * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take
+ * ownership of the SSM, it will call scheduleNext() and return immediately.
*/
void runNext();
@@ -100,17 +110,60 @@ public:
/*
* Gets the current state of connection for testing/diagnostic purposes.
*/
- State state() const {
- return _state;
- }
+ State state();
+
+ /*
+ * Terminates the associated transport Session, and requests that the next call to runNext
+ * should end the session. If the session has already ended, this does nothing.
+ */
+ void terminate();
/*
- * Explicitly ends the session.
+ * Sets a function to be called after the session is ended
*/
- void endSession();
+ void setCleanupHook(stdx::function<void()> hook);
+
+ /*
+ * Gets the transport::Session associated with this connection
+ */
+ const transport::SessionHandle& session() const;
private:
/*
+ * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
+ */
+ class ThreadGuard;
+ friend class ThreadGuard;
+
+ /*
+ * This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor
+ * while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for
+ * callbacks to run.
+ */
+ template <typename Executor, typename Func>
+ void maybeScheduleFunc(Executor* svcExec, Func&& func) {
+ if (svcExec) {
+ uassertStatusOK(svcExec->schedule(
+ [ func = std::move(func), anchor = shared_from_this() ] { func(); }));
+ }
+ }
+
+ template <typename Func>
+ void scheduleFunc(Func&& func) {
+ auto svcExec = _serviceContext->getServiceExecutor();
+ invariant(svcExec);
+ maybeScheduleFunc(svcExec, func);
+ }
+
+ /*
+ * This is the actual implementation of runNext() that gets called after the ThreadGuard
+ * has been successfully created. If any callbacks (like sourceCallback()) need to call
+ * runNext() and already own a ThreadGuard, they should call this with that guard as the
+ * argument.
+ */
+ void runNextInGuard(ThreadGuard& guard);
+
+ /*
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
@@ -123,21 +176,20 @@ private:
void sinkCallback(Status status);
/*
- * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
+ * Releases all the resources associated with the session and call the cleanupHook.
*/
- class ThreadGuard;
- friend class ThreadGuard;
-
- const transport::SessionHandle& session() const;
+ void cleanupSession();
- State _state{State::Source};
+ AtomicWord<State> _state{State::Created};
ServiceEntryPoint* _sep;
bool _sync;
+ ServiceContext* const _serviceContext;
ServiceContext::UniqueClient _dbClient;
const Client* _dbClientPtr;
const std::string _threadName;
+ stdx::function<void()> _cleanupHook;
bool inExhaust = false;
bool wasCompressed = false;
@@ -148,6 +200,32 @@ private:
std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT
};
-std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state);
+template <typename T>
+T& operator<<(T& stream, const ServiceStateMachine::State& state) {
+ switch (state) {
+ case ServiceStateMachine::State::Created:
+ stream << "created";
+ break;
+ case ServiceStateMachine::State::Source:
+ stream << "source";
+ break;
+ case ServiceStateMachine::State::SourceWait:
+ stream << "sourceWait";
+ break;
+ case ServiceStateMachine::State::Process:
+ stream << "process";
+ break;
+ case ServiceStateMachine::State::SinkWait:
+ stream << "sinkWait";
+ break;
+ case ServiceStateMachine::State::EndSession:
+ stream << "endSession";
+ break;
+ case ServiceStateMachine::State::Ended:
+ stream << "ended";
+ break;
+ }
+ return stream;
+}
} // namespace mongo