summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r--src/mongo/transport/service_state_machine.h74
1 files changed, 24 insertions, 50 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 0572c4f6ac8..7794cdfbfce 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/config.h"
+#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
@@ -44,9 +45,11 @@
#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_mode.h"
+#include "mongo/util/future.h"
#include "mongo/util/net/ssl_manager.h"
namespace mongo {
+namespace transport {
/*
* The ServiceStateMachine holds the state of a single client connection and represents the
@@ -63,17 +66,9 @@ public:
ServiceStateMachine& operator=(ServiceStateMachine&&) = delete;
/*
- * Creates a new ServiceStateMachine for a given session/service context. If sync is true,
- * then calls into the transport layer will block while they complete, otherwise they will
- * be handled asynchronously.
+ * Construct a ServiceStateMachine for a given Client.
*/
- static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
- transport::SessionHandle session,
- transport::Mode transportMode);
-
- ServiceStateMachine(ServiceContext* svcContext,
- transport::SessionHandle session,
- transport::Mode transportMode);
+ ServiceStateMachine(ServiceContext::UniqueClient client);
/*
* Any state may transition to EndSession in case of an error, otherwise the valid state
@@ -106,31 +101,9 @@ public:
enum class Ownership { kUnowned, kOwned, kStatic };
/*
- * runNext() will run the current state of the state machine. It also handles all the error
- * handling and state management for requests.
- *
- * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack
- * if they have just completed a database operation to make sure that this doesn't infinitely
- * recurse.
- *
- * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take
- * ownership of the SSM, it will call scheduleNext() and return immediately.
- */
- void runNext();
-
- /*
- * start() schedules a call to runNext() in the future.
- *
- * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not
- * guaranteed that runNext() will run after this return
- */
- void start(Ownership ownershipModel);
-
- /*
- * Set the executor to be used for the next call to runNext(). This allows switching between
- * thread models after the SSM has started.
+ * start() schedules a call to _runOnce() in the future.
*/
- void setServiceExecutor(transport::ServiceExecutor* executor);
+ void start();
/*
* Gets the current state of connection for testing/diagnostic purposes.
@@ -160,7 +133,8 @@ public:
private:
/*
- * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
+ * A class that wraps up lifetime management of the _dbClient and _threadName for
+ * each step in _runOnce();
*/
class ThreadGuard;
friend class ThreadGuard;
@@ -188,18 +162,15 @@ private:
const transport::SessionHandle& _session() const;
/*
- * This is the actual implementation of runNext() that gets called after the ThreadGuard
- * has been successfully created. If any callbacks (like sourceCallback()) need to call
- * runNext() and already own a ThreadGuard, they should call this with that guard as the
- * argument.
+ * Gets the transport::ServiceExecutor associated with this connection.
*/
- void _runNextInGuard(ThreadGuard guard);
+ ServiceExecutor* _executor();
/*
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
- inline void _processMessage(ThreadGuard guard);
+ Future<void> _processMessage(ThreadGuard guard);
/*
* These get called by the TransportLayer when requested network I/O has completed.
@@ -211,8 +182,8 @@ private:
* Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
* before waiting on the TL.
*/
- void _sourceMessage(ThreadGuard guard);
- void _sinkMessage(ThreadGuard guard, Message toSink);
+ Future<void> _sourceMessage(ThreadGuard guard);
+ Future<void> _sinkMessage(ThreadGuard guard);
/*
* Releases all the resources associated with the session and call the cleanupHook.
@@ -220,27 +191,30 @@ private:
void _cleanupSession(ThreadGuard guard);
/*
+ * This is the initial function called at the beginning of a thread's lifecycle in the
+ * TransportLayer.
+ */
+ void _runOnce();
+
+ /*
* Releases all the resources associated with the exhaust request.
*/
void _cleanupExhaustResources() noexcept;
AtomicWord<State> _state{State::Created};
- ServiceEntryPoint* _sep;
- transport::Mode _transportMode;
-
ServiceContext* const _serviceContext;
- transport::ServiceExecutor* _serviceExecutor;
+ ServiceEntryPoint* const _sep;
transport::SessionHandle _sessionHandle;
- const std::string _threadName;
ServiceContext::UniqueClient _dbClient;
- const Client* _dbClientPtr;
+ Client* _dbClientPtr;
std::function<void()> _cleanupHook;
bool _inExhaust = false;
boost::optional<MessageCompressorId> _compressorId;
Message _inMessage;
+ Message _outMessage;
// Allows delegating destruction of opCtx to another function to potentially remove its cost
// from the critical path. This is currently only used in `_processMessage()`.
@@ -250,7 +224,6 @@ private:
#if MONGO_CONFIG_DEBUG_BUILD
AtomicWord<stdx::thread::id> _owningThread;
#endif
- std::string _oldThreadName;
};
template <typename T>
@@ -281,4 +254,5 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) {
return stream;
}
+} // namespace transport
} // namespace mongo