From 5f3f8d97455eb1a599bf29d3ef2c981c3be0f265 Mon Sep 17 00:00:00 2001 From: Ben Caimano Date: Mon, 14 Sep 2020 04:26:44 +0000 Subject: SERVER-50947 Implize ServiceStateMachine --- src/mongo/transport/service_state_machine.cpp | 348 +++++++++++++++++++------- src/mongo/transport/service_state_machine.h | 164 +----------- 2 files changed, 257 insertions(+), 255 deletions(-) diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index a526f252b20..6fde0057ed2 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -35,13 +35,19 @@ #include +#include "mongo/base/status.h" #include "mongo/config.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" #include "mongo/db/traffic_recorder.h" #include "mongo/logv2/log.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" #include "mongo/rpc/message.h" #include "mongo/rpc/op_msg.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/message_compressor_base.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor_fixed.h" @@ -54,7 +60,9 @@ #include "mongo/util/debug_util.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/net/socket_exception.h" +#include "mongo/util/net/ssl_manager.h" #include "mongo/util/net/ssl_peer_info.h" #include "mongo/util/quick_exit.h" @@ -163,18 +171,168 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { } } // namespace +class ServiceStateMachine::Impl final + : public std::enable_shared_from_this { +public: + /* + * Any state may transition to EndSession in case of an error, otherwise the valid state + * transitions are: + * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC) + * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust) + * Source -> SourceWait -> Process -> Source (fire-and-forget) + */ + enum class State { + Created, // The session has been created, but no operations have been performed yet + Source, // Request a new Message from the network to handle + SourceWait, // Wait for the new Message to arrive from the network + Process, // Run the Message through the database + SinkWait, // Wait for the database result to be sent by the network + EndSession, // End the session - the ServiceStateMachine will be invalid after this + Ended // The session has ended. It is illegal to call any method besides + // state() if this is the current state. + }; + + /* + * When start() is called with Ownership::kOwned, the SSM will swap the Client/thread name + * whenever it runs a stage of the state machine, and then unswap them out when leaving the SSM. + * + * With Ownership::kStatic, it will assume that the SSM will only ever be run from one thread, + * and that thread will not be used for other SSM's. It will swap in the Client/thread name for + * the first run and leave them in place. + * + * kUnowned is used internally to mark that the SSM is inactive. + */ + enum class Ownership { kUnowned, kOwned, kStatic }; + + /* + * A class that wraps up lifetime management of the _client and _threadName for each step in + * runOnce(); + */ + class ThreadGuard; + class ThreadGuardedExecutor; + + Impl(ServiceContext::UniqueClient client) + : _state{State::Created}, + _serviceContext{client->getServiceContext()}, + _sep{_serviceContext->getServiceEntryPoint()}, + _client{std::move(client)}, + _clientPtr{_client.get()} {} + + void start(ServiceExecutorContext seCtx); + + void setCleanupHook(std::function hook); + + /* + * Terminates the associated transport Session, regardless of tags. + * + * This will not block on the session terminating cleaning itself up, it returns immediately. + */ + void terminate(); + + /* + * Terminates the associated transport Session if its tags don't match the supplied tags. If + * the session is in a pending state, before any tags have been set, it will not be terminated. + * + * This will not block on the session terminating cleaning itself up, it returns immediately. + */ + void terminateIfTagsDontMatch(transport::Session::TagMask tags); + + /* + * Terminates the associated transport Session if status indicate error. + * + * This will not block on the session terminating cleaning itself up, it returns immediately. + */ + void terminateAndLogIfError(Status status); + + /* + * This function actually calls into the database and processes a request. It's broken out + * into its own inline function for better readability. + */ + Future processMessage(); + + /* + * These get called by the TransportLayer when requested network I/O has completed. + */ + void sourceCallback(Status status); + void sinkCallback(Status status); + + /* + * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just + * before waiting on the TL. + */ + Future sourceMessage(); + Future sinkMessage(); + + /* + * Releases all the resources associated with the session and call the cleanupHook. + */ + void cleanupSession(); + + /* + * This is the initial function called at the beginning of a thread's lifecycle in the + * TransportLayer. + */ + void runOnce(); + + /* + * Releases all the resources associated with the exhaust request. + */ + void cleanupExhaustResources() noexcept; + + /* + * Gets the current state of connection for testing/diagnostic purposes. + */ + State state() const { + return _state.load(); + } + + /* + * Gets the transport::Session associated with this connection + */ + const transport::SessionHandle& session() { + return _clientPtr->session(); + } + + /* + * Gets the transport::ServiceExecutor associated with this connection. + */ + ServiceExecutor* executor() { + return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor(); + } + +private: + AtomicWord _state{State::Created}; + + ServiceContext* const _serviceContext; + ServiceEntryPoint* const _sep; + + transport::SessionHandle _sessionHandle; + ServiceContext::UniqueClient _client; + Client* _clientPtr; + std::function _cleanupHook; + + bool _inExhaust = false; + boost::optional _compressorId; + Message _inMessage; + Message _outMessage; + + // Allows delegating destruction of opCtx to another function to potentially remove its cost + // from the critical path. This is currently only used in `processMessage()`. + ServiceContext::UniqueOperationContext _killedOpCtx; +}; + /* - * This class wraps up the logic for swapping/unswapping the Client when transitioning - * between states. + * This class wraps up the logic for swapping/unswapping the Client when transitioning between + * states. * * In debug builds this also ensures that only one thread is working on the SSM at once. */ -class ServiceStateMachine::ThreadGuard { +class ServiceStateMachine::Impl::ThreadGuard { ThreadGuard(ThreadGuard&) = delete; ThreadGuard& operator=(ThreadGuard&) = delete; public: - explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} { + explicit ThreadGuard(ServiceStateMachine::Impl* ssm) : _ssm{ssm} { invariant(_ssm); if (_ssm->_clientPtr == Client::getCurrent()) { @@ -250,14 +408,14 @@ public: } private: - ServiceStateMachine* _ssm = nullptr; + ServiceStateMachine::Impl* _ssm = nullptr; bool _haveTakenOwnership = false; std::string _oldThreadName; }; auto getThreadGuardedExecutor = - Client::declareDecoration>(); + Client::declareDecoration>(); /* * A ThreadGuardedExecutor is a client decoration that can wrap any OutOfLineExecutor to allow @@ -269,8 +427,8 @@ auto getThreadGuardedExecutor = * pointer) to both ServiceStateMachine and ThreadGuardedExecutor, thus accessing these objects * through raw pointers (e.g., `this`) is considered safe. */ -class ServiceStateMachine::ThreadGuardedExecutor - : public std::enable_shared_from_this { +class ServiceStateMachine::Impl::ThreadGuardedExecutor + : public std::enable_shared_from_this { public: // Wraps an instance of OutOfLineExecutor and delegates scheduling to ThreadGuardedExecutor class WrappedExecutor : public OutOfLineExecutor, @@ -294,7 +452,8 @@ public: ThreadGuardedExecutor() = delete; ThreadGuardedExecutor(const ThreadGuardedExecutor&) = delete; - explicit ThreadGuardedExecutor(std::weak_ptr ssm) : _ssm(std::move(ssm)) {} + explicit ThreadGuardedExecutor(std::weak_ptr ssm) + : _ssm(std::move(ssm)) {} ThreadGuardedExecutor(ThreadGuardedExecutor&& other) : _isBusy{other._isBusy.load()}, _ssm(other._ssm), _guard(std::move(other._guard)) {} @@ -337,7 +496,7 @@ public: } invariant(!_guard.has_value()); - _guard = ThreadGuard(ssm.get()); + _guard.emplace(ssm.get()); LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Acquired ThreadGuard in scheduled task"); ON_BLOCK_EXIT([&] { @@ -366,38 +525,23 @@ private: // Set to `true` if the executor is busy running a task on behalf of the corresponding client. AtomicWord _isBusy{false}; - std::weak_ptr _ssm; + std::weak_ptr _ssm; boost::optional _guard; }; -ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) - : _state{State::Created}, - _serviceContext{client->getServiceContext()}, - _sep{_serviceContext->getServiceEntryPoint()}, - _client{std::move(client)}, - _clientPtr{_client.get()} {} - -const transport::SessionHandle& ServiceStateMachine::_session() { - return _clientPtr->session(); -} - -ServiceExecutor* ServiceStateMachine::_executor() { - return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor(); -} - -Future ServiceStateMachine::_sourceMessage() { +Future ServiceStateMachine::Impl::sourceMessage() { invariant(_inMessage.empty()); invariant(_state.load() == State::Source); _state.store(State::SourceWait); auto sourceMsgImpl = [&] { - const auto& transportMode = _executor()->transportMode(); + const auto& transportMode = executor()->transportMode(); if (transportMode == transport::Mode::kSynchronous) { MONGO_IDLE_THREAD_BLOCK; - return Future::makeReady(_session()->sourceMessage()); + return Future::makeReady(session()->sourceMessage()); } else { invariant(transportMode == transport::Mode::kAsynchronous); - return _session()->asyncSourceMessage(); + return session()->asyncSourceMessage(); } }; @@ -406,49 +550,49 @@ Future ServiceStateMachine::_sourceMessage() { _inMessage = std::move(msg.getValue()); invariant(!_inMessage.empty()); } - _sourceCallback(msg.getStatus()); + sourceCallback(msg.getStatus()); return Status::OK(); }); } -Future ServiceStateMachine::_sinkMessage() { +Future ServiceStateMachine::Impl::sinkMessage() { // Sink our response to the client invariant(_state.load() == State::Process); _state.store(State::SinkWait); auto toSink = std::exchange(_outMessage, {}); auto sinkMsgImpl = [&] { - const auto& transportMode = _executor()->transportMode(); + const auto& transportMode = executor()->transportMode(); if (transportMode == transport::Mode::kSynchronous) { // We don't consider ourselves idle while sending the reply since we are still doing // work on behalf of the client. Contrast that with sourceMessage() where we are waiting // for the client to send us more work to do. - return Future::makeReady(_session()->sinkMessage(std::move(toSink))); + return Future::makeReady(session()->sinkMessage(std::move(toSink))); } else { invariant(transportMode == transport::Mode::kAsynchronous); - return _session()->asyncSinkMessage(std::move(toSink)); + return session()->asyncSinkMessage(std::move(toSink)); } }; return sinkMsgImpl().onCompletion([this](Status status) { - _sinkCallback(std::move(status)); + sinkCallback(std::move(status)); return Status::OK(); }); } -void ServiceStateMachine::_sourceCallback(Status status) { +void ServiceStateMachine::Impl::sourceCallback(Status status) { auto guard = ThreadGuard(this); invariant(state() == State::SourceWait); - auto remote = _session()->remote(); + auto remote = session()->remote(); if (status.isOK()) { _state.store(State::Process); - // If the sourceMessage succeeded then we can move to on to process the message. We - // simply return from here and the future chain in _runOnce() will continue to the - // next state normally. + // If the sourceMessage succeeded then we can move to on to process the message. We simply + // return from here and the future chain in runOnce() will continue to the next state + // normally. // If any other issues arise, close the session. } else if (ErrorCodes::isInterruption(status.code()) || @@ -473,13 +617,13 @@ void ServiceStateMachine::_sourceCallback(Status status) { "Error receiving request from client. Ending connection from remote", "error"_attr = status, "remote"_attr = remote, - "connectionId"_attr = _session()->id()); + "connectionId"_attr = session()->id()); _state.store(State::EndSession); } uassertStatusOK(status); } -void ServiceStateMachine::_sinkCallback(Status status) { +void ServiceStateMachine::Impl::sinkCallback(Status status) { auto guard = ThreadGuard(this); invariant(state() == State::SinkWait); @@ -493,8 +637,8 @@ void ServiceStateMachine::_sinkCallback(Status status) { LOGV2(22989, "Error sending response to client. Ending connection from remote", "error"_attr = status, - "remote"_attr = _session()->remote(), - "connectionId"_attr = _session()->id()); + "remote"_attr = session()->remote(), + "connectionId"_attr = session()->id()); _state.store(State::EndSession); uassertStatusOK(status); } else if (_inExhaust) { @@ -504,13 +648,13 @@ void ServiceStateMachine::_sinkCallback(Status status) { } } -Future ServiceStateMachine::_processMessage() { +Future ServiceStateMachine::Impl::processMessage() { invariant(!_inMessage.empty()); TrafficRecorder::get(_serviceContext) - .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage); + .observe(session(), _serviceContext->getPreciseClockSource()->now(), _inMessage); - auto& compressorMgr = MessageCompressorManager::forSession(_session()); + auto& compressorMgr = MessageCompressorManager::forSession(session()); _compressorId = boost::none; if (_inMessage.operation() == dbCompressed) { @@ -553,7 +697,7 @@ Future ServiceStateMachine::_processMessage() { toSink.header().setResponseToMsgId(_inMessage.header().getId()); if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { #ifdef MONGO_CONFIG_SSL - if (!SSLPeerInfo::forSession(_session()).isTLS) { + if (!SSLPeerInfo::forSession(session()).isTLS) { OpMsg::appendChecksum(&toSink); } #else @@ -578,7 +722,7 @@ Future ServiceStateMachine::_processMessage() { } TrafficRecorder::get(_serviceContext) - .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink); + .observe(session(), _serviceContext->getPreciseClockSource()->now(), toSink); _outMessage = std::move(toSink); } else { @@ -590,7 +734,7 @@ Future ServiceStateMachine::_processMessage() { }); } -void ServiceStateMachine::start(ServiceExecutorContext seCtx) { +void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) { { stdx::lock_guard lk(*_clientPtr); @@ -602,34 +746,32 @@ void ServiceStateMachine::start(ServiceExecutorContext seCtx) { ThreadGuardedExecutor::get(_clientPtr) ->schedule( - _executor(), + executor(), GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { // If this is the first run of the SSM, then update its state to Source if (state() == State::Created) { _state.store(State::Source); } - _runOnce(); + runOnce(); })); } -void ServiceStateMachine::_runOnce() { - makeReadyFutureWith([] {}) - .thenRunOn(ThreadGuardedExecutor::get(_clientPtr)->wrapExecutor(_executor())) - .then([this]() -> Future { - if (_inExhaust) { - return Status::OK(); - } else { - return _sourceMessage(); - } - }) - .then([this]() { return _processMessage(); }) +void ServiceStateMachine::Impl::runOnce() { + makeReadyFutureWith([&]() -> Future { + if (_inExhaust) { + return Status::OK(); + } else { + return sourceMessage(); + } + }) + .then([this]() { return processMessage(); }) .then([this]() -> Future { if (_outMessage.empty()) { return Status::OK(); } - return _sinkMessage(); + return sinkMessage(); }) .getAsync([this](Status status) { // Destroy the opCtx (already killed) here, to potentially use the delay between @@ -639,9 +781,9 @@ void ServiceStateMachine::_runOnce() { } if (!status.isOK()) { _state.store(State::EndSession); - // The service executor failed to schedule the task. This could for example be - // that we failed to start a worker thread. Terminate this connection to leave - // the system in a valid state. + // The service executor failed to schedule the task. This could for example be that + // we failed to start a worker thread. Terminate this connection to leave the system + // in a valid state. LOGV2_WARNING_OPTIONS(4910400, {logv2::LogComponent::kExecutor}, "Terminating session due to error: {error}", @@ -650,56 +792,46 @@ void ServiceStateMachine::_runOnce() { terminate(); ThreadGuardedExecutor::get(_clientPtr) - ->schedule(_executor(), + ->schedule(executor(), GuaranteedExecutor::enforceRunOnce( [this, anchor = shared_from_this()](Status status) { - _cleanupSession(); + cleanupSession(); })); return; } ThreadGuardedExecutor::get(_clientPtr) - ->schedule(_executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) { - _runOnce(); + ->schedule(executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) { + runOnce(); })); }); } -void ServiceStateMachine::terminate() { +void ServiceStateMachine::Impl::terminate() { if (state() == State::Ended) return; - _session()->end(); + session()->end(); } -void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) { +void ServiceStateMachine::Impl::terminateIfTagsDontMatch(transport::Session::TagMask tags) { if (state() == State::Ended) return; - auto sessionTags = _session()->getTags(); + auto sessionTags = session()->getTags(); - // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have - // been set, then skip the termination check. + // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been + // set, then skip the termination check. if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) { - LOGV2(22991, - "Skip closing connection for connection", - "connectionId"_attr = _session()->id()); + LOGV2( + 22991, "Skip closing connection for connection", "connectionId"_attr = session()->id()); return; } terminate(); } -void ServiceStateMachine::setCleanupHook(std::function hook) { - invariant(state() == State::Created); - _cleanupHook = std::move(hook); -} - -ServiceStateMachine::State ServiceStateMachine::state() { - return _state.load(); -} - -void ServiceStateMachine::_terminateAndLogIfError(Status status) { +void ServiceStateMachine::Impl::terminateAndLogIfError(Status status) { if (!status.isOK()) { LOGV2_WARNING_OPTIONS(22993, {logv2::LogComponent::kExecutor}, @@ -710,7 +842,7 @@ void ServiceStateMachine::_terminateAndLogIfError(Status status) { } } -void ServiceStateMachine::_cleanupExhaustResources() noexcept try { +void ServiceStateMachine::Impl::cleanupExhaustResources() noexcept try { if (!_inExhaust) { return; } @@ -720,8 +852,8 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { auto cursorId = request.body["getMore"].Long(); auto opCtx = Client::getCurrent()->makeOperationContext(); // Fire and forget. This is a best effort attempt to immediately clean up the exhaust - // cursor. If the killCursors request fails here for any reasons, it will still be - // cleaned up once the cursor times out. + // cursor. If the killCursors request fails here for any reasons, it will still be cleaned + // up once the cursor times out. _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get(); } } catch (const DBException& e) { @@ -731,14 +863,19 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { "error"_attr = e.toStatus()); } -void ServiceStateMachine::_cleanupSession() { +void ServiceStateMachine::Impl::setCleanupHook(std::function hook) { + invariant(state() == State::Created); + _cleanupHook = std::move(hook); +} + +void ServiceStateMachine::Impl::cleanupSession() { // Ensure the delayed destruction of opCtx always happens before doing the cleanup. if (MONGO_likely(_killedOpCtx)) { _killedOpCtx.reset(); } invariant(!_killedOpCtx); - _cleanupExhaustResources(); + cleanupExhaustResources(); { stdx::lock_guard lk(*_clientPtr); @@ -756,5 +893,24 @@ void ServiceStateMachine::_cleanupSession() { _outMessage.reset(); } +ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) + : _impl{std::make_shared(std::move(client))} {} + +void ServiceStateMachine::start(ServiceExecutorContext seCtx) { + _impl->start(std::move(seCtx)); +} + +void ServiceStateMachine::setCleanupHook(std::function hook) { + _impl->setCleanupHook(std::move(hook)); +} + +void ServiceStateMachine::terminate() { + _impl->terminate(); +} + +void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) { + _impl->terminateIfTagsDontMatch(tags); +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 1962715e81a..917ca6ca09e 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -29,24 +29,13 @@ #pragma once -#include #include #include -#include "mongo/base/status.h" -#include "mongo/config.h" -#include "mongo/db/client.h" #include "mongo/db/service_context.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/platform/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/message_compressor_base.h" -#include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" -#include "mongo/util/future.h" -#include "mongo/util/net/ssl_manager.h" namespace mongo { namespace transport { @@ -57,61 +46,26 @@ namespace transport { * ServiceEntryPoint and TransportLayer that ties network and database logic together for a * user. */ -class ServiceStateMachine : public std::enable_shared_from_this { -public: - class ThreadGuardedExecutor; - +class ServiceStateMachine { ServiceStateMachine(ServiceStateMachine&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&) = delete; ServiceStateMachine(ServiceStateMachine&&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; +public: + class Impl; + /* * Construct a ServiceStateMachine for a given Client. */ ServiceStateMachine(ServiceContext::UniqueClient client); - /* - * Any state may transition to EndSession in case of an error, otherwise the valid state - * transitions are: - * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC) - * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust) - * Source -> SourceWait -> Process -> Source (fire-and-forget) - */ - enum class State { - Created, // The session has been created, but no operations have been performed yet - Source, // Request a new Message from the network to handle - SourceWait, // Wait for the new Message to arrive from the network - Process, // Run the Message through the database - SinkWait, // Wait for the database result to be sent by the network - EndSession, // End the session - the ServiceStateMachine will be invalid after this - Ended // The session has ended. It is illegal to call any method besides - // state() if this is the current state. - }; - - /* - * When start() is called with Ownership::kOwned, the SSM will swap the Client/thread name - * whenever it runs a stage of the state machine, and then unswap them out when leaving the SSM. - * - * With Ownership::kStatic, it will assume that the SSM will only ever be run from one thread, - * and that thread will not be used for other SSM's. It will swap in the Client/thread name - * for the first run and leave them in place. - * - * kUnowned is used internally to mark that the SSM is inactive. - */ - enum class Ownership { kUnowned, kOwned, kStatic }; - /* * start() schedules a call to _runOnce() in the future. */ void start(ServiceExecutorContext seCtx); - /* - * Gets the current state of connection for testing/diagnostic purposes. - */ - State state(); - /* * Terminates the associated transport Session, regardless of tags. * @@ -134,116 +88,8 @@ public: void setCleanupHook(std::function hook); private: - /* - * A class that wraps up lifetime management of the _dbClient and _threadName for - * each step in _runOnce(); - */ - class ThreadGuard; - - /* - * Terminates the associated transport Session if status indicate error. - * - * This will not block on the session terminating cleaning itself up, it returns immediately. - */ - void _terminateAndLogIfError(Status status); - - /* - * Gets the transport::Session associated with this connection - */ - const transport::SessionHandle& _session(); - - /* - * Gets the transport::ServiceExecutor associated with this connection. - */ - ServiceExecutor* _executor(); - - /* - * This function actually calls into the database and processes a request. It's broken out - * into its own inline function for better readability. - */ - Future _processMessage(); - - /* - * These get called by the TransportLayer when requested network I/O has completed. - */ - void _sourceCallback(Status status); - void _sinkCallback(Status status); - - /* - * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just - * before waiting on the TL. - */ - Future _sourceMessage(); - Future _sinkMessage(); - - /* - * Releases all the resources associated with the session and call the cleanupHook. - */ - void _cleanupSession(); - - /* - * This is the initial function called at the beginning of a thread's lifecycle in the - * TransportLayer. - */ - void _runOnce(); - - /* - * Releases all the resources associated with the exhaust request. - */ - void _cleanupExhaustResources() noexcept; - - AtomicWord _state{State::Created}; - - ServiceContext* const _serviceContext; - ServiceEntryPoint* const _sep; - - transport::SessionHandle _sessionHandle; - ServiceContext::UniqueClient _client; - Client* _clientPtr; - std::function _cleanupHook; - - bool _inExhaust = false; - boost::optional _compressorId; - Message _inMessage; - Message _outMessage; - - // Allows delegating destruction of opCtx to another function to potentially remove its cost - // from the critical path. This is currently only used in `_processMessage()`. - ServiceContext::UniqueOperationContext _killedOpCtx; - - AtomicWord _owned{Ownership::kUnowned}; -#if MONGO_CONFIG_DEBUG_BUILD - AtomicWord _owningThread; -#endif + std::shared_ptr _impl; }; -template -T& operator<<(T& stream, const ServiceStateMachine::State& state) { - switch (state) { - case ServiceStateMachine::State::Created: - stream << "created"; - break; - case ServiceStateMachine::State::Source: - stream << "source"; - break; - case ServiceStateMachine::State::SourceWait: - stream << "sourceWait"; - break; - case ServiceStateMachine::State::Process: - stream << "process"; - break; - case ServiceStateMachine::State::SinkWait: - stream << "sinkWait"; - break; - case ServiceStateMachine::State::EndSession: - stream << "endSession"; - break; - case ServiceStateMachine::State::Ended: - stream << "ended"; - break; - } - return stream; -} - } // namespace transport } // namespace mongo -- cgit v1.2.1