summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r--src/mongo/transport/service_state_machine.h74
1 files changed, 50 insertions, 24 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 7794cdfbfce..0572c4f6ac8 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -35,7 +35,6 @@
#include "mongo/base/status.h"
#include "mongo/config.h"
-#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
@@ -45,11 +44,9 @@
#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_mode.h"
-#include "mongo/util/future.h"
#include "mongo/util/net/ssl_manager.h"
namespace mongo {
-namespace transport {
/*
* The ServiceStateMachine holds the state of a single client connection and represents the
@@ -66,9 +63,17 @@ public:
ServiceStateMachine& operator=(ServiceStateMachine&&) = delete;
/*
- * Construct a ServiceStateMachine for a given Client.
+ * Creates a new ServiceStateMachine for a given session/service context. If sync is true,
+ * then calls into the transport layer will block while they complete, otherwise they will
+ * be handled asynchronously.
*/
- ServiceStateMachine(ServiceContext::UniqueClient client);
+ static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode);
+
+ ServiceStateMachine(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode);
/*
* Any state may transition to EndSession in case of an error, otherwise the valid state
@@ -101,9 +106,31 @@ public:
enum class Ownership { kUnowned, kOwned, kStatic };
/*
- * start() schedules a call to _runOnce() in the future.
+ * runNext() will run the current state of the state machine. It also handles all the error
+ * handling and state management for requests.
+ *
+ * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack
+ * if they have just completed a database operation to make sure that this doesn't infinitely
+ * recurse.
+ *
+ * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take
+ * ownership of the SSM, it will call scheduleNext() and return immediately.
+ */
+ void runNext();
+
+ /*
+ * start() schedules a call to runNext() in the future.
+ *
+ * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not
+ * guaranteed that runNext() will run after this return
+ */
+ void start(Ownership ownershipModel);
+
+ /*
+ * Set the executor to be used for the next call to runNext(). This allows switching between
+ * thread models after the SSM has started.
*/
- void start();
+ void setServiceExecutor(transport::ServiceExecutor* executor);
/*
* Gets the current state of connection for testing/diagnostic purposes.
@@ -133,8 +160,7 @@ public:
private:
/*
- * A class that wraps up lifetime management of the _dbClient and _threadName for
- * each step in _runOnce();
+ * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
*/
class ThreadGuard;
friend class ThreadGuard;
@@ -162,15 +188,18 @@ private:
const transport::SessionHandle& _session() const;
/*
- * Gets the transport::ServiceExecutor associated with this connection.
+ * This is the actual implementation of runNext() that gets called after the ThreadGuard
+ * has been successfully created. If any callbacks (like sourceCallback()) need to call
+ * runNext() and already own a ThreadGuard, they should call this with that guard as the
+ * argument.
*/
- ServiceExecutor* _executor();
+ void _runNextInGuard(ThreadGuard guard);
/*
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
- Future<void> _processMessage(ThreadGuard guard);
+ inline void _processMessage(ThreadGuard guard);
/*
* These get called by the TransportLayer when requested network I/O has completed.
@@ -182,8 +211,8 @@ private:
* Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
* before waiting on the TL.
*/
- Future<void> _sourceMessage(ThreadGuard guard);
- Future<void> _sinkMessage(ThreadGuard guard);
+ void _sourceMessage(ThreadGuard guard);
+ void _sinkMessage(ThreadGuard guard, Message toSink);
/*
* Releases all the resources associated with the session and call the cleanupHook.
@@ -191,30 +220,27 @@ private:
void _cleanupSession(ThreadGuard guard);
/*
- * This is the initial function called at the beginning of a thread's lifecycle in the
- * TransportLayer.
- */
- void _runOnce();
-
- /*
* Releases all the resources associated with the exhaust request.
*/
void _cleanupExhaustResources() noexcept;
AtomicWord<State> _state{State::Created};
+ ServiceEntryPoint* _sep;
+ transport::Mode _transportMode;
+
ServiceContext* const _serviceContext;
- ServiceEntryPoint* const _sep;
+ transport::ServiceExecutor* _serviceExecutor;
transport::SessionHandle _sessionHandle;
+ const std::string _threadName;
ServiceContext::UniqueClient _dbClient;
- Client* _dbClientPtr;
+ const Client* _dbClientPtr;
std::function<void()> _cleanupHook;
bool _inExhaust = false;
boost::optional<MessageCompressorId> _compressorId;
Message _inMessage;
- Message _outMessage;
// Allows delegating destruction of opCtx to another function to potentially remove its cost
// from the critical path. This is currently only used in `_processMessage()`.
@@ -224,6 +250,7 @@ private:
#if MONGO_CONFIG_DEBUG_BUILD
AtomicWord<stdx::thread::id> _owningThread;
#endif
+ std::string _oldThreadName;
};
template <typename T>
@@ -254,5 +281,4 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) {
return stream;
}
-} // namespace transport
} // namespace mongo