summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_state_machine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_state_machine.cpp')
-rw-r--r--src/mongo/transport/service_state_machine.cpp295
1 files changed, 40 insertions, 255 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 0cb587426e9..acc60bc3fcb 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -38,6 +38,7 @@
#include "mongo/base/status.h"
#include "mongo/config.h"
#include "mongo/db/client.h"
+#include "mongo/db/client_strand.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/traffic_recorder.h"
@@ -56,7 +57,6 @@
#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point.h"
@@ -204,19 +204,11 @@ public:
*/
enum class Ownership { kUnowned, kOwned, kStatic };
- /*
- * A class that wraps up lifetime management of the _client and _threadName for each step in
- * runOnce();
- */
- class ThreadGuard;
- class ThreadGuardedExecutor;
-
Impl(ServiceContext::UniqueClient client)
: _state{State::Created},
_serviceContext{client->getServiceContext()},
_sep{_serviceContext->getServiceEntryPoint()},
- _client{std::move(client)},
- _clientPtr{_client.get()} {}
+ _clientStrand{ClientStrand::make(std::move(client))} {}
void start(ServiceExecutorContext seCtx);
@@ -257,8 +249,7 @@ public:
void sinkCallback(Status status);
/*
- * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
- * before waiting on the TL.
+ * Source/Sink message from the TransportLayer.
*/
Future<void> sourceMessage();
Future<void> sinkMessage();
@@ -290,14 +281,14 @@ public:
* Gets the transport::Session associated with this connection
*/
const transport::SessionHandle& session() {
- return _clientPtr->session();
+ return _clientStrand->getClientPointer()->session();
}
/*
* Gets the transport::ServiceExecutor associated with this connection.
*/
ServiceExecutor* executor() {
- return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor();
+ return ServiceExecutorContext::get(_clientStrand->getClientPointer())->getServiceExecutor();
}
private:
@@ -307,8 +298,7 @@ private:
ServiceEntryPoint* const _sep;
transport::SessionHandle _sessionHandle;
- ServiceContext::UniqueClient _client;
- Client* _clientPtr;
+ ClientStrandPtr _clientStrand;
std::function<void()> _cleanupHook;
bool _inExhaust = false;
@@ -323,214 +313,6 @@ private:
ServiceContext::UniqueOperationContext _opCtx;
};
-/*
- * This class wraps up the logic for swapping/unswapping the Client when transitioning between
- * states.
- *
- * In debug builds this also ensures that only one thread is working on the SSM at once.
- */
-class ServiceStateMachine::Impl::ThreadGuard {
- ThreadGuard(ThreadGuard&) = delete;
- ThreadGuard& operator=(ThreadGuard&) = delete;
-
-public:
- explicit ThreadGuard(ServiceStateMachine::Impl* ssm) : _ssm{ssm} {
- invariant(_ssm);
-
- if (_ssm->_clientPtr == Client::getCurrent()) {
- // We're not the first on this thread, nothing more to do.
- return;
- }
-
- auto& client = _ssm->_client;
- invariant(client);
-
- // Set up the thread name
- auto oldThreadName = getThreadName();
- const auto& threadName = client->desc();
- if (oldThreadName != threadName) {
- _oldThreadName = oldThreadName.toString();
- setThreadName(threadName);
- }
-
- // Swap the current Client so calls to cc() work as expected
- Client::setCurrent(std::move(client));
- _haveTakenOwnership = true;
- }
-
- // Constructing from a moved ThreadGuard invalidates the other thread guard.
- ThreadGuard(ThreadGuard&& other)
- : _ssm{std::exchange(other._ssm, nullptr)},
- _haveTakenOwnership{std::exchange(_haveTakenOwnership, false)} {}
-
- ThreadGuard& operator=(ThreadGuard&& other) {
- _ssm = std::exchange(other._ssm, nullptr);
- _haveTakenOwnership = std::exchange(other._haveTakenOwnership, false);
- return *this;
- };
-
- ThreadGuard() = delete;
-
- ~ThreadGuard() {
- release();
- }
-
- explicit operator bool() const {
- return _ssm;
- }
-
- void release() {
- if (!_ssm) {
- // We've been released or moved from.
- return;
- }
-
- // If we have a ServiceStateMachine pointer, then it should control the current Client.
- invariant(_ssm->_clientPtr == Client::getCurrent());
-
- if (auto haveTakenOwnership = std::exchange(_haveTakenOwnership, false);
- !haveTakenOwnership) {
- // Reset our pointer so that we cannot release again.
- _ssm = nullptr;
-
- // We are not the original owner, nothing more to do.
- return;
- }
-
- // Reclaim the client.
- _ssm->_client = Client::releaseCurrent();
-
- // Reset our pointer so that we cannot release again.
- _ssm = nullptr;
-
- if (!_oldThreadName.empty()) {
- // Reset the old thread name.
- setThreadName(_oldThreadName);
- }
- }
-
-private:
- ServiceStateMachine::Impl* _ssm = nullptr;
-
- bool _haveTakenOwnership = false;
- std::string _oldThreadName;
-};
-
-auto getThreadGuardedExecutor =
- Client::declareDecoration<std::shared_ptr<ServiceStateMachine::Impl::ThreadGuardedExecutor>>();
-
-/*
- * A ThreadGuardedExecutor is a client decoration that can wrap any OutOfLineExecutor to allow
- * processing tasks while having the client object (i.e., `Client`) attached to the executor thread.
- * In particular, scheduling tasks through a ThreadGuardedExecutor ensures that the corresponding
- * client object is reachable through `Client::getCurrent()` and prevents concurrent accesses to the
- * client object. A ThreadGuardedExecutor is only valid in the context of ServiceStateMachine.
- * Also, any task scheduled through ThreadGuardedExecutor captures a reference (i.e., shared
- * pointer) to both ServiceStateMachine and ThreadGuardedExecutor, thus accessing these objects
- * through raw pointers (e.g., `this`) is considered safe.
- */
-class ServiceStateMachine::Impl::ThreadGuardedExecutor
- : public std::enable_shared_from_this<ServiceStateMachine::Impl::ThreadGuardedExecutor> {
-public:
- // Wraps an instance of OutOfLineExecutor and delegates scheduling to ThreadGuardedExecutor
- class WrappedExecutor : public OutOfLineExecutor,
- public std::enable_shared_from_this<WrappedExecutor> {
- public:
- WrappedExecutor(const WrappedExecutor&) = delete;
- WrappedExecutor(WrappedExecutor&&) = delete;
-
- WrappedExecutor(std::shared_ptr<ThreadGuardedExecutor> parent, OutOfLineExecutor* executor)
- : _parent(std::move(parent)), _executor(executor) {}
-
- void schedule(OutOfLineExecutor::Task task) override {
- _parent->schedule(_executor, std::move(task));
- }
-
- private:
- std::shared_ptr<ThreadGuardedExecutor> const _parent;
- OutOfLineExecutor* const _executor;
- };
-
- ThreadGuardedExecutor() = delete;
- ThreadGuardedExecutor(const ThreadGuardedExecutor&) = delete;
-
- explicit ThreadGuardedExecutor(std::weak_ptr<ServiceStateMachine::Impl> ssm)
- : _ssm(std::move(ssm)) {}
-
- ThreadGuardedExecutor(ThreadGuardedExecutor&& other)
- : _isBusy{other._isBusy.load()}, _ssm(other._ssm), _guard(std::move(other._guard)) {}
-
- ~ThreadGuardedExecutor() {
- invariant(!_isBusy.load());
- invariant(!_guard.has_value());
- }
-
- static void set(Client* client, ThreadGuardedExecutor instance) {
- auto& clientThreadGuardedExecutor = getThreadGuardedExecutor(client);
- invariant(!clientThreadGuardedExecutor);
- clientThreadGuardedExecutor = std::make_shared<decltype(instance)>(std::move(instance));
- }
-
- static std::shared_ptr<ThreadGuardedExecutor> get(const Client* client) {
- return getThreadGuardedExecutor(client);
- }
-
- auto wrapExecutor(OutOfLineExecutor* executor) {
- return std::make_shared<WrappedExecutor>(shared_from_this(), executor);
- }
-
- void schedule(OutOfLineExecutor* executor, OutOfLineExecutor::Task task) {
- // Since `ThreadGuardedExecutor` is a client decoration, and SSM owns the client object,
- // ServiceStateMachine must be present when tasks are scheduled here.
- auto ssm = _ssm.lock();
- invariant(ssm);
-
- executor->schedule([this,
- executor, // Valid as the executor must be present to run the task
- task = std::move(task),
- ssm = std::move(ssm),
- anchor = shared_from_this()](Status status) mutable {
- if (auto wasBusy = _isBusy.swap(true); wasBusy) {
- // Reschedule if another executor thread is running a task for the client.
- LOGV2_DEBUG(4910704, kDiagnosticLogLevel, "Rescheduling thread-guarded task");
- schedule(executor, std::move(task));
- return;
- }
-
- invariant(!_guard.has_value());
- _guard.emplace(ssm.get());
- LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Acquired ThreadGuard in scheduled task");
-
- ON_BLOCK_EXIT([&] {
- if (_guard.has_value()) {
- releaseThreadGuard();
- }
- });
-
- LOGV2_DEBUG(
- 4910703, kDiagnosticLogLevel, "Started running task in a thread-guarded context");
- task(status);
- });
- }
-
- // Must only be called on the thread that owns the guard
- void releaseThreadGuard() {
- invariant(_guard.has_value() && _isBusy.load());
- LOGV2_DEBUG(4910702, kDiagnosticLogLevel, "Releasing the ThreadGuard");
- _guard = boost::none;
- _isBusy.store(false);
- }
-
-private:
- static constexpr auto kDiagnosticLogLevel = 3;
-
- // Set to `true` if the executor is busy running a task on behalf of the corresponding client.
- AtomicWord<bool> _isBusy{false};
-
- std::weak_ptr<ServiceStateMachine::Impl> _ssm;
- boost::optional<ThreadGuard> _guard;
-};
-
Future<void> ServiceStateMachine::Impl::sourceMessage() {
invariant(_inMessage.empty());
invariant(_state.load() == State::Source);
@@ -583,8 +365,6 @@ Future<void> ServiceStateMachine::Impl::sinkMessage() {
}
void ServiceStateMachine::Impl::sourceCallback(Status status) {
- auto guard = ThreadGuard(this);
-
invariant(state() == State::SourceWait);
auto remote = session()->remote();
@@ -626,8 +406,6 @@ void ServiceStateMachine::Impl::sourceCallback(Status status) {
}
void ServiceStateMachine::Impl::sinkCallback(Status status) {
- auto guard = ThreadGuard(this);
-
invariant(state() == State::SinkWait);
// If there was an error sinking the message to the client, then we should print an error and
@@ -736,25 +514,25 @@ Future<void> ServiceStateMachine::Impl::processMessage() {
void ServiceStateMachine::Impl::start(ServiceExecutorContext seCtx) {
{
- stdx::lock_guard lk(*_clientPtr);
-
- ServiceExecutorContext::set(_clientPtr, std::move(seCtx));
+ auto client = _clientStrand->getClientPointer();
+ stdx::lock_guard lk(*client);
+ ServiceExecutorContext::set(client, std::move(seCtx));
}
- // Set the executor decoration here to ensure `shared_from_this()` returns a valid pointer
- ThreadGuardedExecutor::set(_clientPtr, ThreadGuardedExecutor(shared_from_this()));
+ invariant(_state.swap(State::Source) == State::Created);
- ThreadGuardedExecutor::get(_clientPtr)
- ->schedule(
- executor(),
- GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
- // If this is the first run of the SSM, then update its state to Source
- if (state() == State::Created) {
- _state.store(State::Source);
- }
+ auto cb = [this, anchor = shared_from_this()](Status status) {
+ _clientStrand->run([&] {
+ if (ErrorCodes::isCancelationError(status)) {
+ cleanupSession();
+ return;
+ }
+ invariant(status);
- runOnce();
- }));
+ runOnce();
+ });
+ };
+ executor()->schedule(std::move(cb));
}
void ServiceStateMachine::Impl::runOnce() {
@@ -791,19 +569,25 @@ void ServiceStateMachine::Impl::runOnce() {
"error"_attr = status);
terminate();
- ThreadGuardedExecutor::get(_clientPtr)
- ->schedule(executor(),
- GuaranteedExecutor::enforceRunOnce(
- [this, anchor = shared_from_this()](Status status) {
- cleanupSession();
- }));
+ auto cb = [this, anchor = shared_from_this()](Status status) {
+ _clientStrand->run([&] { cleanupSession(); });
+ };
+ executor()->schedule(std::move(cb));
return;
}
- ThreadGuardedExecutor::get(_clientPtr)
- ->schedule(executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) {
- runOnce();
- }));
+ auto cb = [this, anchor = shared_from_this()](Status status) {
+ _clientStrand->run([&] {
+ if (ErrorCodes::isCancelationError(status)) {
+ cleanupSession();
+ return;
+ }
+ invariant(status);
+
+ runOnce();
+ });
+ };
+ executor()->schedule(std::move(cb));
});
}
@@ -878,8 +662,9 @@ void ServiceStateMachine::Impl::cleanupSession() {
cleanupExhaustResources();
{
- stdx::lock_guard lk(*_clientPtr);
- transport::ServiceExecutorContext::reset(_clientPtr);
+ auto client = _clientStrand->getClientPointer();
+ stdx::lock_guard lk(*client);
+ transport::ServiceExecutorContext::reset(client);
}
if (auto cleanupHook = std::exchange(_cleanupHook, {})) {