diff options
Diffstat (limited to 'src/mongo/transport/service_state_machine.cpp')
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 295 |
1 files changed, 40 insertions, 255 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 0cb587426e9..acc60bc3fcb 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -38,6 +38,7 @@ #include "mongo/base/status.h" #include "mongo/config.h" #include "mongo/db/client.h" +#include "mongo/db/client_strand.h" #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" #include "mongo/db/traffic_recorder.h" @@ -56,7 +57,6 @@ #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/concurrency/thread_name.h" #include "mongo/util/debug_util.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point.h" @@ -204,19 +204,11 @@ public: */ enum class Ownership { kUnowned, kOwned, kStatic }; - /* - * A class that wraps up lifetime management of the _client and _threadName for each step in - * runOnce(); - */ - class ThreadGuard; - class ThreadGuardedExecutor; - Impl(ServiceContext::UniqueClient client) : _state{State::Created}, _serviceContext{client->getServiceContext()}, _sep{_serviceContext->getServiceEntryPoint()}, - _client{std::move(client)}, - _clientPtr{_client.get()} {} + _clientStrand{ClientStrand::make(std::move(client))} {} void start(ServiceExecutorContext seCtx); @@ -257,8 +249,7 @@ public: void sinkCallback(Status status); /* - * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just - * before waiting on the TL. + * Source/Sink message from the TransportLayer. */ Future<void> sourceMessage(); Future<void> sinkMessage(); @@ -290,14 +281,14 @@ public: * Gets the transport::Session associated with this connection */ const transport::SessionHandle& session() { - return _clientPtr->session(); + return _clientStrand->getClientPointer()->session(); } /* * Gets the transport::ServiceExecutor associated with this connection. */ ServiceExecutor* executor() { - return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor(); + return ServiceExecutorContext::get(_clientStrand->getClientPointer())->getServiceExecutor(); } private: @@ -307,8 +298,7 @@ private: ServiceEntryPoint* const _sep; transport::SessionHandle _sessionHandle; - ServiceContext::UniqueClient _client; - Client* _clientPtr; + ClientStrandPtr _clientStrand; std::function<void()> _cleanupHook; bool _inExhaust = false; @@ -323,214 +313,6 @@ private: ServiceContext::UniqueOperationContext _opCtx; }; -/* - * This class wraps up the logic for swapping/unswapping the Client when transitioning between - * states. - * - * In debug builds this also ensures that only one thread is working on the SSM at once. - */ -class ServiceStateMachine::Impl::ThreadGuard { - ThreadGuard(ThreadGuard&) = delete; - ThreadGuard& operator=(ThreadGuard&) = delete; - -public: - explicit ThreadGuard(ServiceStateMachine::Impl* ssm) : _ssm{ssm} { - invariant(_ssm); - - if (_ssm->_clientPtr == Client::getCurrent()) { - // We're not the first on this thread, nothing more to do. - return; - } - - auto& client = _ssm->_client; - invariant(client); - - // Set up the thread name - auto oldThreadName = getThreadName(); - const auto& threadName = client->desc(); - if (oldThreadName != threadName) { - _oldThreadName = oldThreadName.toString(); - setThreadName(threadName); - } - - // Swap the current Client so calls to cc() work as expected - Client::setCurrent(std::move(client)); - _haveTakenOwnership = true; - } - - // Constructing from a moved ThreadGuard invalidates the other thread guard. - ThreadGuard(ThreadGuard&& other) - : _ssm{std::exchange(other._ssm, nullptr)}, - _haveTakenOwnership{std::exchange(_haveTakenOwnership, false)} {} - - ThreadGuard& operator=(ThreadGuard&& other) { - _ssm = std::exchange(other._ssm, nullptr); - _haveTakenOwnership = std::exchange(other._haveTakenOwnership, false); - return *this; - }; - - ThreadGuard() = delete; - - ~ThreadGuard() { - release(); - } - - explicit operator bool() const { - return _ssm; - } - - void release() { - if (!_ssm) { - // We've been released or moved from. - return; - } - - // If we have a ServiceStateMachine pointer, then it should control the current Client. - invariant(_ssm->_clientPtr == Client::getCurrent()); - - if (auto haveTakenOwnership = std::exchange(_haveTakenOwnership, false); - !haveTakenOwnership) { - // Reset our pointer so that we cannot release again. - _ssm = nullptr; - - // We are not the original owner, nothing more to do. - return; - } - - // Reclaim the client. - _ssm->_client = Client::releaseCurrent(); - - // Reset our pointer so that we cannot release again. - _ssm = nullptr; - - if (!_oldThreadName.empty()) { - // Reset the old thread name. - setThreadName(_oldThreadName); - } - } - -private: - ServiceStateMachine::Impl* _ssm = nullptr; - - bool _haveTakenOwnership = false; - std::string _oldThreadName; -}; - -auto getThreadGuardedExecutor = - Client::declareDecoration<std::shared_ptr<ServiceStateMachine::Impl::ThreadGuardedExecutor>>(); - -/* - * A ThreadGuardedExecutor is a client decoration that can wrap any OutOfLineExecutor to allow - * processing tasks while having the client object (i.e., `Client`) attached to the executor thread. - * In particular, scheduling tasks through a ThreadGuardedExecutor ensures that the corresponding - * client object is reachable through `Client::getCurrent()` and prevents concurrent accesses to the - * client object. A ThreadGuardedExecutor is only valid in the context of ServiceStateMachine. - * Also, any task scheduled through ThreadGuardedExecutor captures a reference (i.e., shared - * pointer) to both ServiceStateMachine and ThreadGuardedExecutor, thus accessing these objects - * through raw pointers (e.g., `this`) is considered safe. - */ -class ServiceStateMachine::Impl::ThreadGuardedExecutor - : public std::enable_shared_from_this<ServiceStateMachine::Impl::ThreadGuardedExecutor> { -public: - // Wraps an instance of OutOfLineExecutor and delegates scheduling to ThreadGuardedExecutor - class WrappedExecutor : public OutOfLineExecutor, - public std::enable_shared_from_this<WrappedExecutor> { - public: - WrappedExecutor(const WrappedExecutor&) = delete; - WrappedExecutor(WrappedExecutor&&) = delete; - - WrappedExecutor(std::shared_ptr<ThreadGuardedExecutor> parent, OutOfLineExecutor* executor) - : _parent(std::move(parent)), _executor(executor) {} - - void schedule(OutOfLineExecutor::Task task) override { - _parent->schedule(_executor, std::move(task)); - } - - private: - std::shared_ptr<ThreadGuardedExecutor> const _parent; - OutOfLineExecutor* const _executor; - }; - - ThreadGuardedExecutor() = delete; - ThreadGuardedExecutor(const ThreadGuardedExecutor&) = delete; - - explicit ThreadGuardedExecutor(std::weak_ptr<ServiceStateMachine::Impl> ssm) - : _ssm(std::move(ssm)) {} - - ThreadGuardedExecutor(ThreadGuardedExecutor&& other) - : _isBusy{other._isBusy.load()}, _ssm(other._ssm), _guard(std::move(other._guard)) {} - - ~ThreadGuardedExecutor() { - invariant(!_isBusy.load()); - invariant(!_guard.has_value()); - } - - static void set(Client* client, ThreadGuardedExecutor instance) { - auto& clientThreadGuardedExecutor = getThreadGuardedExecutor(client); - invariant(!clientThreadGuardedExecutor); - clientThreadGuardedExecutor = std::make_shared<decltype(instance)>(std::move(instance)); - } - - static std::shared_ptr<ThreadGuardedExecutor> get(const Client* client) { - return getThreadGuardedExecutor(client); - } - - auto wrapExecutor(OutOfLineExecutor* executor) { - return std::make_shared<WrappedExecutor>(shared_from_this(), executor); - } - - void schedule(OutOfLineExecutor* executor, OutOfLineExecutor::Task task) { - // Since `ThreadGuardedExecutor` is a client decoration, and SSM owns the client object, - // ServiceStateMachine must be present when tasks are scheduled here. - auto ssm = _ssm.lock(); - invariant(ssm); - - executor->schedule([this, - executor, // Valid as the executor must be present to run the task - task = std::move(task), - ssm = std::move(ssm), - anchor = shared_from_this()](Status status) mutable { - if (auto wasBusy = _isBusy.swap(true); wasBusy) { - // Reschedule if another executor thread is running a task for the client. - LOGV2_DEBUG(4910704, kDiagnosticLogLevel, "Rescheduling thread-guarded task"); - schedule(executor, std::move(task)); - return; - } - - invariant(!_guard.has_value()); - _guard.emplace(ssm.get()); - LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Acquired ThreadGuard in scheduled task"); - - ON_BLOCK_EXIT([&] { - if (_guard.has_value()) { - releaseThreadGuard(); - } - }); - - LOGV2_DEBUG( - 4910703, kDiagnosticLogLevel, "Started running task in a thread-guarded context"); - task(status); - }); - } - - // Must only be called on the thread that owns the guard - void releaseThreadGuard() { - invariant(_guard.has_value() && _isBusy.load()); - LOGV2_DEBUG(4910702, kDiagnosticLogLevel, "Releasing the ThreadGuard"); - _guard = boost::none; - _isBusy.store(false); - } - -private: - static constexpr auto kDiagnosticLogLevel = 3; - - // Set to `true` if the executor is busy running a task on behalf of the corresponding client. - AtomicWord<bool> _isBusy{false}; - - std::weak_ptr<ServiceStateMachine::Impl> _ssm; - boost::optional<ThreadGuard> _guard; -}; - Future<void> ServiceStateMachine::Impl::sourceMessage() { invariant(_inMessage.empty()); invariant(_state.load() == State::Source); @@ -583,8 +365,6 @@ Future<void> ServiceStateMachine::Impl::sinkMessage() { } void ServiceStateMachine::Impl::sourceCallback(Status status) { - auto guard = ThreadGuard(this); - invariant(state() == State::SourceWait); auto remote = session()->remote(); @@ -626,8 +406,6 @@ void ServiceStateMachine::Impl::sourceCallback(Status status) { } void ServiceStateMachine::Impl::sinkCallback(Status status) { - auto guard = ThreadGuard(this); - invariant(state() == State::SinkWait); // If there was an error sinking the message to the client, then we should print an error and @@ -736,25 +514,25 @@ Future<void> ServiceStateMachine::Impl::processMessage() { void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) { { - stdx::lock_guard lk(*_clientPtr); - - ServiceExecutorContext::set(_clientPtr, std::move(seCtx)); + auto client = _clientStrand->getClientPointer(); + stdx::lock_guard lk(*client); + ServiceExecutorContext::set(client, std::move(seCtx)); } - // Set the executor decoration here to ensure `shared_from_this()` returns a valid pointer - ThreadGuardedExecutor::set(_clientPtr, ThreadGuardedExecutor(shared_from_this())); + invariant(_state.swap(State::Source) == State::Created); - ThreadGuardedExecutor::get(_clientPtr) - ->schedule( - executor(), - GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { - // If this is the first run of the SSM, then update its state to Source - if (state() == State::Created) { - _state.store(State::Source); - } + auto cb = [this, anchor = shared_from_this()](Status status) { + _clientStrand->run([&] { + if (ErrorCodes::isCancelationError(status)) { + cleanupSession(); + return; + } + invariant(status); - runOnce(); - })); + runOnce(); + }); + }; + executor()->schedule(std::move(cb)); } void ServiceStateMachine::Impl::runOnce() { @@ -791,19 +569,25 @@ void ServiceStateMachine::Impl::runOnce() { "error"_attr = status); terminate(); - ThreadGuardedExecutor::get(_clientPtr) - ->schedule(executor(), - GuaranteedExecutor::enforceRunOnce( - [this, anchor = shared_from_this()](Status status) { - cleanupSession(); - })); + auto cb = [this, anchor = shared_from_this()](Status status) { + _clientStrand->run([&] { cleanupSession(); }); + }; + executor()->schedule(std::move(cb)); return; } - ThreadGuardedExecutor::get(_clientPtr) - ->schedule(executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) { - runOnce(); - })); + auto cb = [this, anchor = shared_from_this()](Status status) { + _clientStrand->run([&] { + if (ErrorCodes::isCancelationError(status)) { + cleanupSession(); + return; + } + invariant(status); + + runOnce(); + }); + }; + executor()->schedule(std::move(cb)); }); } @@ -878,8 +662,9 @@ void ServiceStateMachine::Impl::cleanupSession() { cleanupExhaustResources(); { - stdx::lock_guard lk(*_clientPtr); - transport::ServiceExecutorContext::reset(_clientPtr); + auto client = _clientStrand->getClientPointer(); + stdx::lock_guard lk(*client); + transport::ServiceExecutorContext::reset(client); } if (auto cleanupHook = std::exchange(_cleanupHook, {})) { |