From dabf0e0842dcedad2485cc19e65db262bb3172f6 Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Mon, 14 Sep 2020 22:01:12 +0000 Subject: SERVER-49107 Allow binding clients to executor threads --- src/mongo/transport/service_state_machine.cpp | 177 +++++++++++++++++++++----- src/mongo/transport/service_state_machine.h | 4 +- 2 files changed, 150 insertions(+), 31 deletions(-) diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index e9fa1ee95a6..a526f252b20 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -256,6 +256,120 @@ private: std::string _oldThreadName; }; +auto getThreadGuardedExecutor = + Client::declareDecoration>(); + +/* + * A ThreadGuardedExecutor is a client decoration that can wrap any OutOfLineExecutor to allow + * processing tasks while having the client object (i.e., `Client`) attached to the executor thread. + * In particular, scheduling tasks through a ThreadGuardedExecutor ensures that the corresponding + * client object is reachable through `Client::getCurrent()` and prevents concurrent accesses to the + * client object. A ThreadGuardedExecutor is only valid in the context of ServiceStateMachine. + * Also, any task scheduled through ThreadGuardedExecutor captures a reference (i.e., shared + * pointer) to both ServiceStateMachine and ThreadGuardedExecutor, thus accessing these objects + * through raw pointers (e.g., `this`) is considered safe. + */ +class ServiceStateMachine::ThreadGuardedExecutor + : public std::enable_shared_from_this { +public: + // Wraps an instance of OutOfLineExecutor and delegates scheduling to ThreadGuardedExecutor + class WrappedExecutor : public OutOfLineExecutor, + public std::enable_shared_from_this { + public: + WrappedExecutor(const WrappedExecutor&) = delete; + WrappedExecutor(WrappedExecutor&&) = delete; + + WrappedExecutor(std::shared_ptr parent, OutOfLineExecutor* executor) + : _parent(std::move(parent)), _executor(executor) {} + + void schedule(OutOfLineExecutor::Task task) override { + _parent->schedule(_executor, std::move(task)); + } + + private: + std::shared_ptr const _parent; + OutOfLineExecutor* const _executor; + }; + + ThreadGuardedExecutor() = delete; + ThreadGuardedExecutor(const ThreadGuardedExecutor&) = delete; + + explicit ThreadGuardedExecutor(std::weak_ptr ssm) : _ssm(std::move(ssm)) {} + + ThreadGuardedExecutor(ThreadGuardedExecutor&& other) + : _isBusy{other._isBusy.load()}, _ssm(other._ssm), _guard(std::move(other._guard)) {} + + ~ThreadGuardedExecutor() { + invariant(!_isBusy.load()); + invariant(!_guard.has_value()); + } + + static void set(Client* client, ThreadGuardedExecutor instance) { + auto& clientThreadGuardedExecutor = getThreadGuardedExecutor(client); + invariant(!clientThreadGuardedExecutor); + clientThreadGuardedExecutor = std::make_shared(std::move(instance)); + } + + static std::shared_ptr get(const Client* client) { + return getThreadGuardedExecutor(client); + } + + auto wrapExecutor(OutOfLineExecutor* executor) { + return std::make_shared(shared_from_this(), executor); + } + + void schedule(OutOfLineExecutor* executor, OutOfLineExecutor::Task task) { + // Since `ThreadGuardedExecutor` is a client decoration, and SSM owns the client object, + // ServiceStateMachine must be present when tasks are scheduled here. + auto ssm = _ssm.lock(); + invariant(ssm); + + executor->schedule([this, + executor, // Valid as the executor must be present to run the task + task = std::move(task), + ssm = std::move(ssm), + anchor = shared_from_this()](Status status) mutable { + if (auto wasBusy = _isBusy.swap(true); wasBusy) { + // Reschedule if another executor thread is running a task for the client. + LOGV2_DEBUG(4910704, kDiagnosticLogLevel, "Rescheduling thread-guarded task"); + schedule(executor, std::move(task)); + return; + } + + invariant(!_guard.has_value()); + _guard = ThreadGuard(ssm.get()); + LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Acquired ThreadGuard in scheduled task"); + + ON_BLOCK_EXIT([&] { + if (_guard.has_value()) { + releaseThreadGuard(); + } + }); + + LOGV2_DEBUG( + 4910703, kDiagnosticLogLevel, "Started running task in a thread-guarded context"); + task(status); + }); + } + + // Must only be called on the thread that owns the guard + void releaseThreadGuard() { + invariant(_guard.has_value() && _isBusy.load()); + LOGV2_DEBUG(4910702, kDiagnosticLogLevel, "Releasing the ThreadGuard"); + _guard = boost::none; + _isBusy.store(false); + } + +private: + static constexpr auto kDiagnosticLogLevel = 3; + + // Set to `true` if the executor is busy running a task on behalf of the corresponding client. + AtomicWord _isBusy{false}; + + std::weak_ptr _ssm; + boost::optional _guard; +}; + ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) : _state{State::Created}, _serviceContext{client->getServiceContext()}, @@ -272,8 +386,6 @@ ServiceExecutor* ServiceStateMachine::_executor() { } Future ServiceStateMachine::_sourceMessage() { - auto guard = ThreadGuard(this); - invariant(_inMessage.empty()); invariant(_state.load() == State::Source); _state.store(State::SourceWait); @@ -300,8 +412,6 @@ Future ServiceStateMachine::_sourceMessage() { } Future ServiceStateMachine::_sinkMessage() { - auto guard = ThreadGuard(this); - // Sink our response to the client invariant(_state.load() == State::Process); _state.store(State::SinkWait); @@ -395,8 +505,6 @@ void ServiceStateMachine::_sinkCallback(Status status) { } Future ServiceStateMachine::_processMessage() { - auto guard = ThreadGuard(this); - invariant(!_inMessage.empty()); TrafficRecorder::get(_serviceContext) @@ -426,8 +534,6 @@ Future ServiceStateMachine::_processMessage() { return _sep->handleRequest(opCtx.get(), _inMessage) .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)]( DbResponse dbresponse) mutable -> void { - auto guard = ThreadGuard(this); - // opCtx must be killed and delisted here so that the operation cannot show up in // currentOp results after the response reaches the client. The destruction is postponed // for later to mitigate its performance impact on the critical path of execution. @@ -491,25 +597,32 @@ void ServiceStateMachine::start(ServiceExecutorContext seCtx) { ServiceExecutorContext::set(_clientPtr, std::move(seCtx)); } - _executor()->schedule( - GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { - // If this is the first run of the SSM, then update its state to Source - if (state() == State::Created) { - _state.store(State::Source); - } + // Set the executor decoration here to ensure `shared_from_this()` returns a valid pointer + ThreadGuardedExecutor::set(_clientPtr, ThreadGuardedExecutor(shared_from_this())); + + ThreadGuardedExecutor::get(_clientPtr) + ->schedule( + _executor(), + GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { + // If this is the first run of the SSM, then update its state to Source + if (state() == State::Created) { + _state.store(State::Source); + } - _runOnce(); - })); + _runOnce(); + })); } void ServiceStateMachine::_runOnce() { - makeReadyFutureWith([&]() -> Future { - if (_inExhaust) { - return Status::OK(); - } else { - return _sourceMessage(); - } - }) + makeReadyFutureWith([] {}) + .thenRunOn(ThreadGuardedExecutor::get(_clientPtr)->wrapExecutor(_executor())) + .then([this]() -> Future { + if (_inExhaust) { + return Status::OK(); + } else { + return _sourceMessage(); + } + }) .then([this]() { return _processMessage(); }) .then([this]() -> Future { if (_outMessage.empty()) { @@ -518,7 +631,7 @@ void ServiceStateMachine::_runOnce() { return _sinkMessage(); }) - .getAsync([this, anchor = shared_from_this()](Status status) { + .getAsync([this](Status status) { // Destroy the opCtx (already killed) here, to potentially use the delay between // clients' requests to hide the destruction cost. if (MONGO_likely(_killedOpCtx)) { @@ -536,13 +649,19 @@ void ServiceStateMachine::_runOnce() { "error"_attr = status); terminate(); - _executor()->schedule(GuaranteedExecutor::enforceRunOnce( - [this, anchor = shared_from_this()](Status status) { _cleanupSession(); })); + ThreadGuardedExecutor::get(_clientPtr) + ->schedule(_executor(), + GuaranteedExecutor::enforceRunOnce( + [this, anchor = shared_from_this()](Status status) { + _cleanupSession(); + })); return; } - _executor()->schedule(GuaranteedExecutor::enforceRunOnce( - [this, anchor = shared_from_this()](Status status) { _runOnce(); })); + ThreadGuardedExecutor::get(_clientPtr) + ->schedule(_executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) { + _runOnce(); + })); }); } @@ -613,8 +732,6 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { } void ServiceStateMachine::_cleanupSession() { - auto guard = ThreadGuard(this); - // Ensure the delayed destruction of opCtx always happens before doing the cleanup. if (MONGO_likely(_killedOpCtx)) { _killedOpCtx.reset(); diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 2c649442fee..1962715e81a 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -58,10 +58,12 @@ namespace transport { * user. */ class ServiceStateMachine : public std::enable_shared_from_this { +public: + class ThreadGuardedExecutor; + ServiceStateMachine(ServiceStateMachine&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&) = delete; -public: ServiceStateMachine(ServiceStateMachine&&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; -- cgit v1.2.1