summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.h
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2017-08-15 11:25:22 -0400
committerHenrik Edin <henrik.edin@mongodb.com>2017-09-22 16:38:54 -0400
commit6732fbb1fb749e9f22f0ed4633e24515f842dafc (patch)
tree46ecca0cb2d251ee428f51fcb32c975a184bce9f /src/mongo/transport/service_state_machine.h
parentab7ceed2108a7d19518490929b03fa6f4a13257c (diff)
downloadmongo-6732fbb1fb749e9f22f0ed4633e24515f842dafc.tar.gz
SERVER-30135 Added a synchronous executor to make the code path between the two modes similar while still allowing customization in the execution. Should fix some perf regressions that came with unifying the service state machine.
Diffstat (limited to 'src/mongo/transport/service_state_machine.h')
-rw-r--r--src/mongo/transport/service_state_machine.h51
1 files changed, 28 insertions, 23 deletions
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index fab326a5a68..b89b3a34d24 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -41,6 +41,7 @@
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
+#include "mongo/transport/transport_mode.h"
namespace mongo {
@@ -65,17 +66,19 @@ public:
*/
static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
transport::SessionHandle session,
- bool sync);
+ transport::Mode transportMode);
- ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync);
+ ServiceStateMachine(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode);
/*
- * Any state may transition to EndSession in case of an error, otherwise the valid state
- * transitions are:
- * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC)
- * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust)
- * Source -> SourceWait -> Process -> Source (fire-and-forget)
- */
+ * Any state may transition to EndSession in case of an error, otherwise the valid state
+ * transitions are:
+ * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC)
+ * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust)
+ * Source -> SourceWait -> Process -> Source (fire-and-forget)
+ */
enum class State {
Created, // The session has been created, but no operations have been performed yet
Source, // Request a new Message from the network to handle
@@ -135,25 +138,28 @@ private:
friend class ThreadGuard;
/*
+ * Terminates the associated transport Session if status indicate error.
+ *
+ * This will not block on the session terminating cleaning itself up, it returns immediately.
+ */
+ void _terminateAndLogIfError(Status status);
+
+ /*
* This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor
* while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for
* callbacks to run.
*/
- template <typename Executor, typename Func>
- void _maybeScheduleFunc(Executor* svcExec,
- Func&& func,
- transport::ServiceExecutor::ScheduleFlags flags) {
- if (svcExec) {
- uassertStatusOK(svcExec->schedule(
- [ func = std::move(func), anchor = shared_from_this() ] { func(); }, flags));
- }
- }
-
template <typename Func>
void _scheduleFunc(Func&& func, transport::ServiceExecutor::ScheduleFlags flags) {
- auto svcExec = _serviceContext->getServiceExecutor();
- invariant(svcExec);
- _maybeScheduleFunc(svcExec, func, flags);
+ Status status = _serviceContext->getServiceExecutor()->schedule(
+ [ func = std::move(func), anchor = shared_from_this() ] { func(); }, flags);
+ if (!status.isOK()) {
+ // The service executor failed to schedule the task
+ // This could for example be that we failed to start
+ // a worker thread. Terminate this connection to
+ // leave the system in a valid state.
+ _terminateAndLogIfError(status);
+ }
}
/*
@@ -189,7 +195,7 @@ private:
AtomicWord<State> _state{State::Created};
ServiceEntryPoint* _sep;
- bool _sync;
+ transport::Mode _transportMode;
ServiceContext* const _serviceContext;
@@ -202,7 +208,6 @@ private:
bool _inExhaust = false;
boost::optional<MessageCompressorId> _compressorId;
Message _inMessage;
- int64_t _counter = 0;
AtomicWord<stdx::thread::id> _currentOwningThread;
std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT