summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-14 22:01:12 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-14 22:01:12 +0000
commitdabf0e0842dcedad2485cc19e65db262bb3172f6 (patch)
tree5e03c3092427298ddfec71e53565e54489932371
parent92e7ecf4ba43a483c91b17e5adfeedc2dc0c6fcf (diff)
downloadmongo-dabf0e0842dcedad2485cc19e65db262bb3172f6.tar.gz
SERVER-49107 Allow binding clients to executor threads
-rw-r--r--src/mongo/transport/service_state_machine.cpp177
-rw-r--r--src/mongo/transport/service_state_machine.h4
2 files changed, 150 insertions, 31 deletions
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index e9fa1ee95a6..a526f252b20 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -256,6 +256,120 @@ private:
std::string _oldThreadName;
};
+auto getThreadGuardedExecutor =
+ Client::declareDecoration<std::shared_ptr<ServiceStateMachine::ThreadGuardedExecutor>>();
+
+/*
+ * A ThreadGuardedExecutor is a client decoration that can wrap any OutOfLineExecutor to allow
+ * processing tasks while having the client object (i.e., `Client`) attached to the executor thread.
+ * In particular, scheduling tasks through a ThreadGuardedExecutor ensures that the corresponding
+ * client object is reachable through `Client::getCurrent()` and prevents concurrent accesses to the
+ * client object. A ThreadGuardedExecutor is only valid in the context of ServiceStateMachine.
+ * Also, any task scheduled through ThreadGuardedExecutor captures a reference (i.e., shared
+ * pointer) to both ServiceStateMachine and ThreadGuardedExecutor, thus accessing these objects
+ * through raw pointers (e.g., `this`) is considered safe.
+ */
+class ServiceStateMachine::ThreadGuardedExecutor
+ : public std::enable_shared_from_this<ServiceStateMachine::ThreadGuardedExecutor> {
+public:
+ // Wraps an instance of OutOfLineExecutor and delegates scheduling to ThreadGuardedExecutor
+ class WrappedExecutor : public OutOfLineExecutor,
+ public std::enable_shared_from_this<WrappedExecutor> {
+ public:
+ WrappedExecutor(const WrappedExecutor&) = delete;
+ WrappedExecutor(WrappedExecutor&&) = delete;
+
+ WrappedExecutor(std::shared_ptr<ThreadGuardedExecutor> parent, OutOfLineExecutor* executor)
+ : _parent(std::move(parent)), _executor(executor) {}
+
+ void schedule(OutOfLineExecutor::Task task) override {
+ _parent->schedule(_executor, std::move(task));
+ }
+
+ private:
+ std::shared_ptr<ThreadGuardedExecutor> const _parent;
+ OutOfLineExecutor* const _executor;
+ };
+
+ ThreadGuardedExecutor() = delete;
+ ThreadGuardedExecutor(const ThreadGuardedExecutor&) = delete;
+
+ explicit ThreadGuardedExecutor(std::weak_ptr<ServiceStateMachine> ssm) : _ssm(std::move(ssm)) {}
+
+ ThreadGuardedExecutor(ThreadGuardedExecutor&& other)
+ : _isBusy{other._isBusy.load()}, _ssm(other._ssm), _guard(std::move(other._guard)) {}
+
+ ~ThreadGuardedExecutor() {
+ invariant(!_isBusy.load());
+ invariant(!_guard.has_value());
+ }
+
+ static void set(Client* client, ThreadGuardedExecutor instance) {
+ auto& clientThreadGuardedExecutor = getThreadGuardedExecutor(client);
+ invariant(!clientThreadGuardedExecutor);
+ clientThreadGuardedExecutor = std::make_shared<decltype(instance)>(std::move(instance));
+ }
+
+ static std::shared_ptr<ThreadGuardedExecutor> get(const Client* client) {
+ return getThreadGuardedExecutor(client);
+ }
+
+ auto wrapExecutor(OutOfLineExecutor* executor) {
+ return std::make_shared<WrappedExecutor>(shared_from_this(), executor);
+ }
+
+ void schedule(OutOfLineExecutor* executor, OutOfLineExecutor::Task task) {
+ // Since `ThreadGuardedExecutor` is a client decoration, and SSM owns the client object,
+ // ServiceStateMachine must be present when tasks are scheduled here.
+ auto ssm = _ssm.lock();
+ invariant(ssm);
+
+ executor->schedule([this,
+ executor, // Valid as the executor must be present to run the task
+ task = std::move(task),
+ ssm = std::move(ssm),
+ anchor = shared_from_this()](Status status) mutable {
+ if (auto wasBusy = _isBusy.swap(true); wasBusy) {
+ // Reschedule if another executor thread is running a task for the client.
+ LOGV2_DEBUG(4910704, kDiagnosticLogLevel, "Rescheduling thread-guarded task");
+ schedule(executor, std::move(task));
+ return;
+ }
+
+ invariant(!_guard.has_value());
+ _guard = ThreadGuard(ssm.get());
+ LOGV2_DEBUG(4910701, kDiagnosticLogLevel, "Acquired ThreadGuard in scheduled task");
+
+ ON_BLOCK_EXIT([&] {
+ if (_guard.has_value()) {
+ releaseThreadGuard();
+ }
+ });
+
+ LOGV2_DEBUG(
+ 4910703, kDiagnosticLogLevel, "Started running task in a thread-guarded context");
+ task(status);
+ });
+ }
+
+ // Must only be called on the thread that owns the guard
+ void releaseThreadGuard() {
+ invariant(_guard.has_value() && _isBusy.load());
+ LOGV2_DEBUG(4910702, kDiagnosticLogLevel, "Releasing the ThreadGuard");
+ _guard = boost::none;
+ _isBusy.store(false);
+ }
+
+private:
+ static constexpr auto kDiagnosticLogLevel = 3;
+
+ // Set to `true` if the executor is busy running a task on behalf of the corresponding client.
+ AtomicWord<bool> _isBusy{false};
+
+ std::weak_ptr<ServiceStateMachine> _ssm;
+ boost::optional<ThreadGuard> _guard;
+};
+
ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)
: _state{State::Created},
_serviceContext{client->getServiceContext()},
@@ -272,8 +386,6 @@ ServiceExecutor* ServiceStateMachine::_executor() {
}
Future<void> ServiceStateMachine::_sourceMessage() {
- auto guard = ThreadGuard(this);
-
invariant(_inMessage.empty());
invariant(_state.load() == State::Source);
_state.store(State::SourceWait);
@@ -300,8 +412,6 @@ Future<void> ServiceStateMachine::_sourceMessage() {
}
Future<void> ServiceStateMachine::_sinkMessage() {
- auto guard = ThreadGuard(this);
-
// Sink our response to the client
invariant(_state.load() == State::Process);
_state.store(State::SinkWait);
@@ -395,8 +505,6 @@ void ServiceStateMachine::_sinkCallback(Status status) {
}
Future<void> ServiceStateMachine::_processMessage() {
- auto guard = ThreadGuard(this);
-
invariant(!_inMessage.empty());
TrafficRecorder::get(_serviceContext)
@@ -426,8 +534,6 @@ Future<void> ServiceStateMachine::_processMessage() {
return _sep->handleRequest(opCtx.get(), _inMessage)
.then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)](
DbResponse dbresponse) mutable -> void {
- auto guard = ThreadGuard(this);
-
// opCtx must be killed and delisted here so that the operation cannot show up in
// currentOp results after the response reaches the client. The destruction is postponed
// for later to mitigate its performance impact on the critical path of execution.
@@ -491,25 +597,32 @@ void ServiceStateMachine::start(ServiceExecutorContext seCtx) {
ServiceExecutorContext::set(_clientPtr, std::move(seCtx));
}
- _executor()->schedule(
- GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
- // If this is the first run of the SSM, then update its state to Source
- if (state() == State::Created) {
- _state.store(State::Source);
- }
+ // Set the executor decoration here to ensure `shared_from_this()` returns a valid pointer
+ ThreadGuardedExecutor::set(_clientPtr, ThreadGuardedExecutor(shared_from_this()));
+
+ ThreadGuardedExecutor::get(_clientPtr)
+ ->schedule(
+ _executor(),
+ GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
+ // If this is the first run of the SSM, then update its state to Source
+ if (state() == State::Created) {
+ _state.store(State::Source);
+ }
- _runOnce();
- }));
+ _runOnce();
+ }));
}
void ServiceStateMachine::_runOnce() {
- makeReadyFutureWith([&]() -> Future<void> {
- if (_inExhaust) {
- return Status::OK();
- } else {
- return _sourceMessage();
- }
- })
+ makeReadyFutureWith([] {})
+ .thenRunOn(ThreadGuardedExecutor::get(_clientPtr)->wrapExecutor(_executor()))
+ .then([this]() -> Future<void> {
+ if (_inExhaust) {
+ return Status::OK();
+ } else {
+ return _sourceMessage();
+ }
+ })
.then([this]() { return _processMessage(); })
.then([this]() -> Future<void> {
if (_outMessage.empty()) {
@@ -518,7 +631,7 @@ void ServiceStateMachine::_runOnce() {
return _sinkMessage();
})
- .getAsync([this, anchor = shared_from_this()](Status status) {
+ .getAsync([this](Status status) {
// Destroy the opCtx (already killed) here, to potentially use the delay between
// clients' requests to hide the destruction cost.
if (MONGO_likely(_killedOpCtx)) {
@@ -536,13 +649,19 @@ void ServiceStateMachine::_runOnce() {
"error"_attr = status);
terminate();
- _executor()->schedule(GuaranteedExecutor::enforceRunOnce(
- [this, anchor = shared_from_this()](Status status) { _cleanupSession(); }));
+ ThreadGuardedExecutor::get(_clientPtr)
+ ->schedule(_executor(),
+ GuaranteedExecutor::enforceRunOnce(
+ [this, anchor = shared_from_this()](Status status) {
+ _cleanupSession();
+ }));
return;
}
- _executor()->schedule(GuaranteedExecutor::enforceRunOnce(
- [this, anchor = shared_from_this()](Status status) { _runOnce(); }));
+ ThreadGuardedExecutor::get(_clientPtr)
+ ->schedule(_executor(), GuaranteedExecutor::enforceRunOnce([this](Status status) {
+ _runOnce();
+ }));
});
}
@@ -613,8 +732,6 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
}
void ServiceStateMachine::_cleanupSession() {
- auto guard = ThreadGuard(this);
-
// Ensure the delayed destruction of opCtx always happens before doing the cleanup.
if (MONGO_likely(_killedOpCtx)) {
_killedOpCtx.reset();
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 2c649442fee..1962715e81a 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -58,10 +58,12 @@ namespace transport {
* user.
*/
class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> {
+public:
+ class ThreadGuardedExecutor;
+
ServiceStateMachine(ServiceStateMachine&) = delete;
ServiceStateMachine& operator=(ServiceStateMachine&) = delete;
-public:
ServiceStateMachine(ServiceStateMachine&&) = delete;
ServiceStateMachine& operator=(ServiceStateMachine&&) = delete;