summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-08-10 17:54:46 +0000
committerBen Caimano <ben.caimano@10gen.com>2020-10-23 21:47:37 +0000
commit243a506fca5b2cca017192ff455e1520d9783a9e (patch)
tree789e602c68c83af9f897b320bac70ed802463450
parent8cff99b2c4710a0e8abb7a0631d24385092697b4 (diff)
downloadmongo-243a506fca5b2cca017192ff455e1520d9783a9e.tar.gz
SERVER-49109 ServiceExecutorFixed tracks work and sessions
This commit also adds a server parameter to start on the "borrowed" threading model and introduces an evergreen variant for it.
-rw-r--r--etc/evergreen.yml30
-rw-r--r--src/mongo/base/error_codes.yml3
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp24
-rw-r--r--src/mongo/transport/service_executor.cpp31
-rw-r--r--src/mongo/transport/service_executor.h21
-rw-r--r--src/mongo/transport/service_executor.idl21
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp364
-rw-r--r--src/mongo/transport/service_executor_fixed.h97
-rw-r--r--src/mongo/transport/service_executor_test.cpp48
9 files changed, 488 insertions, 151 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index de9f17fc7f7..997d9ef897d 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -12600,6 +12600,36 @@ buildvariants:
distros:
- ubuntu1804-build
+- name: enterprise-ubuntu-fixed-service-executor-1604-64-bit
+ display_name: "~ Enterprise Ubuntu 16.04 (with FixedServiceExecutor)"
+ batchtime: 1440 # 1 day
+ run_on:
+ - ubuntu1604-test
+ modules:
+ - enterprise
+ expansions:
+ scons_cache_scope: shared
+ compile_flags: MONGO_DISTMOD=ubuntu1604 -j$(grep -c ^processor /proc/cpuinfo) --variables-files=etc/scons/mongodbtoolchain_v3_gcc.vars
+ multiversion_platform: ubuntu1604
+ multiversion_edition: enterprise
+ test_flags: >-
+ --mongosSetParameters="{initialServiceExecutorThreadingModel: borrowed}"
+ --mongodSetParameters="{initialServiceExecutorThreadingModel: borrowed}"
+
+ tasks:
+ - name: compile_all_run_unittests_TG
+ distros:
+ - ubuntu1604-build
+ - name: .aggregation !.no_async
+ - name: .sharding .auth
+ - name: .sharding .causally_consistent !.wo_snapshot
+ - name: .concurrency .common !.kill_terminate
+ - name: .integration !.audit
+ - name: .jscore .common
+ - name: .logical_session_cache .one_sec
+ - name: .sharding .jscore !.wo_snapshot !.multi_stmt
+ - name: .sharding .common !.csrs
+
- name: enterprise-ubuntu-scanning-replica-set-monitor-1604-64-bit
display_name: "~ Enterprise Ubuntu 16.04 (with ScanningReplicaSetMonitor)"
batchtime: 1440 # 1 day
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 05aaaaa15f4..a6317f1c671 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -405,6 +405,9 @@ error_codes:
- {code: 329, name: FailedToRunWithReplyBuilder}
+ # Internal error
+ - {code: 330, name: ServiceExecutorInShutdown, categories: [ShutdownError,CancelationError]}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 38093644e2c..400979f14d9 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -41,6 +41,7 @@
#include "mongo/logv2/log.h"
#include "mongo/transport/ismaster_metrics.h"
#include "mongo/transport/service_executor.h"
+#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/session.h"
#include "mongo/util/processinfo.h"
@@ -143,15 +144,13 @@ Status ServiceEntryPointImpl::start() {
}
}
- // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
- // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
- // return status;
- // }
+ if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
+ return status;
+ }
return Status::OK();
}
-// TODO: explicitly start on the fixed executor
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
const auto& remoteAddr = session->remoteAddr();
@@ -221,7 +220,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
});
auto seCtx = transport::ServiceExecutorContext{};
- seCtx.setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated);
+ seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel());
seCtx.setCanUseReserved(canOverrideMaxConns);
ssm->start(std::move(seCtx));
}
@@ -284,13 +283,12 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
lk.unlock();
- // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
- // timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- // timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
- // !status.isOK()) {
- // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
- // }
+ timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
+ !status.isOK()) {
+ LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
+ }
timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
timeout = std::max(Milliseconds{0}, timeout - timeSpent);
diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp
index c5fd302d64c..0a8815bdd22 100644
--- a/src/mongo/transport/service_executor.cpp
+++ b/src/mongo/transport/service_executor.cpp
@@ -47,23 +47,44 @@ namespace transport {
namespace {
static constexpr auto kDiagnosticLogLevel = 4;
+static constexpr auto kThreadingModelDedicatedStr = "dedicated"_sd;
+static constexpr auto kThreadingModelBorrowedStr = "borrowed"_sd;
+
+auto gInitialThreadingModel = ServiceExecutor::ThreadingModel::kDedicated;
+
auto getServiceExecutorStats =
ServiceContext::declareDecoration<synchronized_value<ServiceExecutorStats>>();
auto getServiceExecutorContext =
Client::declareDecoration<boost::optional<ServiceExecutorContext>>();
} // namespace
-StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) {
+StringData toString(ServiceExecutor::ThreadingModel threadingModel) {
switch (threadingModel) {
- case ServiceExecutorContext::ThreadingModel::kDedicated:
- return "Dedicated"_sd;
- case ServiceExecutorContext::ThreadingModel::kBorrowed:
- return "Borrowed"_sd;
+ case ServiceExecutor::ThreadingModel::kDedicated:
+ return kThreadingModelDedicatedStr;
+ case ServiceExecutor::ThreadingModel::kBorrowed:
+ return kThreadingModelBorrowedStr;
default:
MONGO_UNREACHABLE;
}
}
+Status ServiceExecutor::setInitialThreadingModel(StringData value) noexcept {
+ if (value == kThreadingModelDedicatedStr) {
+ gInitialThreadingModel = ServiceExecutor::ThreadingModel::kDedicated;
+ } else if (value == kThreadingModelBorrowedStr) {
+ gInitialThreadingModel = ServiceExecutor::ThreadingModel::kBorrowed;
+ } else {
+ MONGO_UNREACHABLE;
+ }
+
+ return Status::OK();
+}
+
+auto ServiceExecutor::getInitialThreadingModel() noexcept -> ThreadingModel {
+ return gInitialThreadingModel;
+}
+
ServiceExecutorStats ServiceExecutorStats::get(ServiceContext* ctx) noexcept {
return getServiceExecutorStats(ctx).get();
}
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index 773f9372913..95f5bac68a3 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -51,6 +51,20 @@ namespace transport {
*/
class ServiceExecutor : public OutOfLineExecutor {
public:
+ /**
+ * An enum to indicate if a ServiceExecutor should use dedicated or borrowed threading
+ * resources.
+ */
+ enum class ThreadingModel {
+ kBorrowed,
+ kDedicated,
+ };
+
+ friend StringData toString(ThreadingModel threadingModel);
+
+ static Status setInitialThreadingModel(StringData value) noexcept;
+ static ThreadingModel getInitialThreadingModel() noexcept;
+
virtual ~ServiceExecutor() = default;
using Task = unique_function<void()>;
enum ScheduleFlags {
@@ -127,10 +141,7 @@ public:
*/
class ServiceExecutorContext {
public:
- enum ThreadingModel {
- kBorrowed,
- kDedicated,
- };
+ using ThreadingModel = ServiceExecutor::ThreadingModel;
/**
* Get a pointer to the ServiceExecutorContext for a given client.
@@ -202,8 +213,6 @@ public:
ServiceExecutor* getServiceExecutor() noexcept;
private:
- friend StringData toString(ThreadingModel threadingModel);
-
Client* _client = nullptr;
ServiceEntryPoint* _sep = nullptr;
diff --git a/src/mongo/transport/service_executor.idl b/src/mongo/transport/service_executor.idl
index d380492ec62..d7235440de8 100644
--- a/src/mongo/transport/service_executor.idl
+++ b/src/mongo/transport/service_executor.idl
@@ -28,8 +28,18 @@
global:
cpp_namespace: "mongo::transport"
+ cpp_includes:
+ - "mongo/transport/service_executor.h"
server_parameters:
+ initialServiceExecutorThreadingModel:
+ description: >-
+ Start new client connections using an executor that follows this model.
+ set_at: [ startup ]
+ cpp_vartype: "std::string"
+ cpp_varname: "initialServiceExecutorThreadingModel"
+ on_update: "ServiceExecutor::setInitialThreadingModel"
+ default: "dedicated"
synchronousServiceExecutorRecursionLimit:
description: >-
Tasks may recurse further if their recursion depth is less than this value.
@@ -53,3 +63,14 @@ server_parameters:
cpp_vartype: 'AtomicWord<int>'
cpp_varname: reservedServiceExecutorRecursionLimit
default: 8
+
+ fixedServiceExecutorThreadLimit:
+ description: >-
+ The fixed service executor (thread model "borrowed") can only maintain a count of threads
+ less than this value.
+ set_at: [ startup ]
+ cpp_vartype: "int"
+ cpp_varname: "fixedServiceExecutorThreadLimit"
+ default: 1000
+ validator:
+ gte: 10
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index b9b5424872e..fb34101f213 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -35,8 +35,10 @@
#include "mongo/logv2/log.h"
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/testing_proctor.h"
#include "mongo/util/thread_safety_context.h"
namespace mongo {
@@ -52,47 +54,166 @@ constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "fixed"_sd;
const auto getServiceExecutorFixed =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>();
+ ServiceContext::declareDecoration<std::shared_ptr<ServiceExecutorFixed>>();
const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{
"ServiceExecutorFixed", [](ServiceContext* ctx) {
+ auto limits = ThreadPool::Limits{};
+ limits.minThreads = 0;
+ limits.maxThreads = fixedServiceExecutorThreadLimit;
getServiceExecutorFixed(ctx) =
- std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{});
+ std::make_shared<ServiceExecutorFixed>(ctx, std::move(limits));
}};
} // namespace
-ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options)
- : _options(std::move(options)) {
- _options.onCreateThread =
- [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable {
- _executorContext = std::make_unique<ExecutorThreadContext>(this->weak_from_this());
- if (onCreate) {
- onCreate(name);
- }
- };
- _threadPool = std::make_unique<ThreadPool>(_options);
+class ServiceExecutorFixed::ExecutorThreadContext {
+public:
+ ExecutorThreadContext(ServiceExecutorFixed* serviceExecutor);
+ ~ExecutorThreadContext();
+
+ ExecutorThreadContext(ExecutorThreadContext&&) = delete;
+ ExecutorThreadContext(const ExecutorThreadContext&) = delete;
+
+ template <typename Task>
+ void run(Task&& task) {
+ // Yield here to improve concurrency, especially when there are more executor threads
+ // than CPU cores.
+ stdx::this_thread::yield();
+ _executor->_stats.tasksStarted.fetchAndAdd(1);
+ _recursionDepth++;
+
+ ON_BLOCK_EXIT([&] {
+ _recursionDepth--;
+ _executor->_stats.tasksEnded.fetchAndAdd(1);
+
+ auto lk = stdx::lock_guard(_executor->_mutex);
+ _executor->_checkForShutdown(lk);
+ });
+
+ std::forward<Task>(task)();
+ }
+
+ int getRecursionDepth() const {
+ return _recursionDepth;
+ }
+
+private:
+ ServiceExecutorFixed* const _executor;
+ int _recursionDepth = 0;
+};
+
+ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext(
+ ServiceExecutorFixed* serviceExecutor)
+ : _executor(serviceExecutor) {
+ _executor->_stats.threadsStarted.fetchAndAdd(1);
+ hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet();
+}
+
+ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() {
+ auto ended = _executor->_stats.threadsEnded.addAndFetch(1);
+ auto started = _executor->_stats.threadsStarted.loadRelaxed();
+ if (ended == started) {
+ hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet();
+ }
+}
+
+thread_local std::unique_ptr<ServiceExecutorFixed::ExecutorThreadContext>
+ ServiceExecutorFixed::_executorContext;
+
+ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limits limits)
+ : _svcCtx{ctx}, _options(std::move(limits)) {
+ _options.poolName = "ServiceExecutorFixed";
+ _options.onCreateThread = [this](const auto&) {
+ _executorContext = std::make_unique<ExecutorThreadContext>(this);
+ };
+
+ _threadPool = std::make_shared<ThreadPool>(_options);
}
ServiceExecutorFixed::~ServiceExecutorFixed() {
- invariant(!_canScheduleWork.load());
- if (_state == State::kNotStarted)
- return;
+ switch (_state) {
+ case State::kNotStarted:
+ return;
+ case State::kRunning: {
+ // We should not be running while in this destructor.
+ MONGO_UNREACHABLE;
+ }
+ case State::kStopping:
+ case State::kStopped: {
+ // We can go ahead and attempt to join our thread pool.
+ } break;
+ default: { MONGO_UNREACHABLE; }
+ }
+
+ LOGV2_DEBUG(4910502,
+ kDiagnosticLogLevel,
+ "Shutting down pool for fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
- // Ensures we always call "shutdown" after staring the service executor
- invariant(_state == State::kStopped);
+ // We only can desturct when we have joined all of our tasks and canceled all of our sessions.
+ // This thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are
+ // stiil blocking. If so, we block until they finish here.
_threadPool->shutdown();
_threadPool->join();
- invariant(_numRunningExecutorThreads.load() == 0);
+
+ invariant(_threadsRunning() == 0);
+ invariant(_tasksRunning() == 0);
+ invariant(_waiters.empty());
}
Status ServiceExecutorFixed::start() {
- stdx::lock_guard<Latch> lk(_mutex);
- auto oldState = std::exchange(_state, State::kRunning);
- invariant(oldState == State::kNotStarted);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ switch (_state) {
+ case State::kNotStarted: {
+ // Time to start
+ _state = State::kRunning;
+ } break;
+ case State::kRunning: {
+ return Status::OK();
+ }
+ case State::kStopping:
+ case State::kStopped: {
+ return {ErrorCodes::ServiceExecutorInShutdown,
+ "ServiceExecutorFixed is already stopping or stopped"};
+ }
+ default: { MONGO_UNREACHABLE; }
+ };
+ }
+
+ LOGV2_DEBUG(4910501,
+ kDiagnosticLogLevel,
+ "Starting fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
+
_threadPool->startup();
- _canScheduleWork.store(true);
- LOGV2_DEBUG(
- 4910501, 3, "Started fixed thread-pool service executor", "name"_attr = _options.poolName);
+
+ if (!_svcCtx) {
+ // For some tests, we do not have a ServiceContext.
+ invariant(TestingProctor::instance().isEnabled());
+ return Status::OK();
+ }
+
+ auto tl = _svcCtx->getTransportLayer();
+ invariant(tl);
+
+ auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress);
+ invariant(reactor);
+ _threadPool->schedule([this, reactor](Status) {
+ {
+ // Check to make sure we haven't been shutdown already. Note that there is still a brief
+ // race that immediately follows this check. ASIOReactor::stop() is not permanent, thus
+ // our run() could "restart" the reactor.
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_state != kRunning) {
+ return;
+ }
+ }
+
+ // Start running on the reactor immediately.
+ reactor->run();
+ });
+
return Status::OK();
}
@@ -103,37 +224,115 @@ ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
}
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
- auto waitForShutdown = [&]() mutable -> Status {
- stdx::unique_lock<Latch> lk(_mutex);
- bool success = _shutdownCondition.wait_for(lk, timeout.toSystemDuration(), [this] {
- return _numRunningExecutorThreads.load() == 0;
- });
- return success ? Status::OK()
- : Status(ErrorCodes::ExceededTimeLimit,
- "Failed to shutdown all executor threads within the time limit");
- };
-
- LOGV2_DEBUG(4910502,
- 3,
+ LOGV2_DEBUG(4910503,
+ kDiagnosticLogLevel,
"Shutting down fixed thread-pool service executor",
"name"_attr = _options.poolName);
{
- stdx::lock_guard<Latch> lk(_mutex);
- _canScheduleWork.store(false);
+ auto lk = stdx::unique_lock(_mutex);
+
+ switch (_state) {
+ case State::kNotStarted:
+ case State::kRunning: {
+ _state = State::kStopping;
+
+ for (auto& waiter : _waiters) {
+ // Cancel any session we own.
+ waiter.session->cancelAsyncOperations();
+ }
+
+ // There may not be outstanding threads, check for shutdown now.
+ _checkForShutdown(lk);
- auto oldState = std::exchange(_state, State::kStopped);
- if (oldState != State::kStopped) {
- _threadPool->shutdown();
+ if (_state == State::kStopped) {
+ // We were able to become stopped immediately.
+ return Status::OK();
+ }
+ } break;
+ case State::kStopping: {
+ // Just nead to wait it out.
+ } break;
+ case State::kStopped: {
+ // Totally done.
+ return Status::OK();
+ } break;
+ default: { MONGO_UNREACHABLE; }
}
}
- return waitForShutdown();
+ LOGV2_DEBUG(4910504,
+ kDiagnosticLogLevel,
+ "Waiting for shutdown of fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
+
+ // There is a world where we are able to simply do a timed wait upon a future chain. However,
+ // that world likely requires an OperationContext available through shutdown.
+ auto lk = stdx::unique_lock(_mutex);
+ if (!_shutdownCondition.wait_for(
+ lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) {
+ return Status(ErrorCodes::ExceededTimeLimit,
+ "Failed to shutdown all executor threads within the time limit");
+ }
+
+ return Status::OK();
+}
+
+void ServiceExecutorFixed::_checkForShutdown(WithLock) {
+ if (_state == State::kRunning) {
+ // We're actively running.
+ return;
+ }
+ invariant(_state != State::kNotStarted);
+
+ if (!_waiters.empty()) {
+ // We still have some in wait.
+ return;
+ }
+
+ auto tasksLeft = _tasksLeft();
+ if (tasksLeft > 0) {
+ // We have tasks remaining.
+ return;
+ }
+ invariant(tasksLeft == 0);
+
+ // We have achieved a soft form of shutdown:
+ // - _state != kRunning means that there will be no new external tasks or waiters.
+ // - _waiters.empty() means that all network waits have finished and there will be no new
+ // internal tasks.
+ // - _tasksLeft() == 0 means that all tasks, both internal and external have finished.
+ //
+ // From this point on, all of our threads will be idle. When the dtor runs, the thread pool will
+ // experience a trivial shutdown() and join().
+ _state = State::kStopped;
+
+ LOGV2_DEBUG(
+ 4910505, kDiagnosticLogLevel, "Finishing shutdown", "name"_attr = _options.poolName);
+ _shutdownCondition.notify_one();
+
+ if (!_svcCtx) {
+ // For some tests, we do not have a ServiceContext.
+ invariant(TestingProctor::instance().isEnabled());
+ return;
+ }
+
+ auto tl = _svcCtx->getTransportLayer();
+ invariant(tl);
+
+ auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress);
+ invariant(reactor);
+ reactor->stop();
}
Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
- if (!_canScheduleWork.load()) {
- return Status(ErrorCodes::ShutdownInProgress, "Executor is not running");
+ {
+ auto lk = stdx::unique_lock(_mutex);
+ if (_state != State::kRunning) {
+ return kInShutdown;
+ }
+
+ _stats.tasksScheduled.fetchAndAdd(1);
}
auto mayExecuteTaskInline = [&] {
@@ -157,31 +356,70 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
hangBeforeSchedulingServiceExecutorFixedTask.pauseWhileSet();
- // May throw if an attempt is made to schedule after the thread pool is shutdown.
- try {
- _threadPool->schedule([task = std::move(task)](Status status) mutable {
- internalAssert(status);
- invariant(_executorContext);
- _executorContext->run(std::move(task));
- });
- } catch (DBException& e) {
- return e.toStatus();
- }
+ _threadPool->schedule([this, task = std::move(task)](Status status) mutable {
+ invariant(status);
+
+ _executorContext->run([&] { task(); });
+ });
return Status::OK();
}
+void ServiceExecutorFixed::_schedule(OutOfLineExecutor::Task task) noexcept {
+ {
+ auto lk = stdx::unique_lock(_mutex);
+ if (_state != State::kRunning) {
+ lk.unlock();
+
+ task(kInShutdown);
+ return;
+ }
+
+ _stats.tasksScheduled.fetchAndAdd(1);
+ }
+
+ _threadPool->schedule([this, task = std::move(task)](Status status) mutable {
+ _executorContext->run([&] { task(std::move(status)); });
+ });
+}
+
void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) {
invariant(session);
+
+ auto waiter = Waiter{session, std::move(onCompletionCallback)};
+
+ WaiterList::iterator it;
+ {
+ // Make sure we're still allowed to schedule and track the session
+ auto lk = stdx::unique_lock(_mutex);
+ if (_state != State::kRunning) {
+ lk.unlock();
+ waiter.onCompletionCallback(kInShutdown);
+ return;
+ }
+
+ it = _waiters.emplace(_waiters.end(), std::move(waiter));
+ }
+
session->asyncWaitForData()
.thenRunOn(shared_from_this())
- .getAsync(std::move(onCompletionCallback));
+ .getAsync([this, anchor = shared_from_this(), it](Status status) mutable {
+ Waiter waiter;
+ {
+ // Remove our waiter from the list.
+ auto lk = stdx::unique_lock(_mutex);
+ waiter = std::exchange(*it, {});
+ _waiters.erase(it);
+ }
+
+ waiter.onCompletionCallback(std::move(status));
+ });
}
void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const {
*bob << kExecutorLabel << kExecutorName << kThreadsRunning
- << static_cast<int>(_numRunningExecutorThreads.load());
+ << static_cast<int>(_threadsRunning());
}
int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const {
@@ -189,19 +427,5 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const {
return _executorContext->getRecursionDepth();
}
-ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext(
- std::weak_ptr<ServiceExecutorFixed> serviceExecutor)
- : _executor(std::move(serviceExecutor)) {
- _adjustRunningExecutorThreads(1);
- hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet();
-}
-
-ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() {
- if (auto threadsRunning = _adjustRunningExecutorThreads(-1);
- threadsRunning.has_value() && threadsRunning.value() == 0) {
- hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet();
- }
-}
-
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index a68994d9fb3..3a96ec30f66 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -38,8 +38,11 @@
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
+#include "mongo/stdx/unordered_map.h"
#include "mongo/transport/service_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/future.h"
#include "mongo/util/hierarchical_acquisition.h"
namespace mongo {
@@ -50,10 +53,17 @@ namespace transport {
* This executor always yields before executing scheduled tasks, and never yields before scheduling
* new tasks (i.e., `ScheduleFlags::kMayYieldBeforeSchedule` is a no-op for this executor).
*/
-class ServiceExecutorFixed : public ServiceExecutor,
- public std::enable_shared_from_this<ServiceExecutorFixed> {
+class ServiceExecutorFixed final : public ServiceExecutor,
+ public std::enable_shared_from_this<ServiceExecutorFixed> {
+ static constexpr auto kDiagnosticLogLevel = 3;
+
+ static const inline auto kInShutdown =
+ Status(ErrorCodes::ServiceExecutorInShutdown, "ServiceExecutorFixed is not running");
+
public:
- explicit ServiceExecutorFixed(ThreadPool::Options options);
+ explicit ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limits limits);
+ explicit ServiceExecutorFixed(ThreadPool::Limits limits)
+ : ServiceExecutorFixed(nullptr, std::move(limits)) {}
virtual ~ServiceExecutorFixed();
static ServiceExecutorFixed* get(ServiceContext* ctx);
@@ -61,6 +71,9 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
+ void schedule(OutOfLineExecutor::Task task) override {
+ _schedule(std::move(task));
+ }
void runOnDataAvailable(const SessionHandle& session,
OutOfLineExecutor::Task onCompletionCallback) override;
@@ -79,57 +92,57 @@ public:
private:
// Maintains the execution state (e.g., recursion depth) for executor threads
- class ExecutorThreadContext {
- public:
- ExecutorThreadContext(std::weak_ptr<ServiceExecutorFixed> serviceExecutor);
- ~ExecutorThreadContext();
-
- ExecutorThreadContext(ExecutorThreadContext&&) = delete;
- ExecutorThreadContext(const ExecutorThreadContext&) = delete;
-
- void run(ServiceExecutor::Task task) {
- // Yield here to improve concurrency, especially when there are more executor threads
- // than CPU cores.
- stdx::this_thread::yield();
- _recursionDepth++;
- task();
- _recursionDepth--;
- }
-
- int getRecursionDepth() const {
- return _recursionDepth;
- }
-
- private:
- boost::optional<int> _adjustRunningExecutorThreads(int adjustment) {
- if (auto executor = _executor.lock()) {
- return executor->_numRunningExecutorThreads.addAndFetch(adjustment);
- }
- return boost::none;
- }
-
- int _recursionDepth = 0;
- std::weak_ptr<ServiceExecutorFixed> _executor;
- };
+ class ExecutorThreadContext;
private:
- AtomicWord<size_t> _numRunningExecutorThreads{0};
- AtomicWord<bool> _canScheduleWork{false};
+ void _checkForShutdown(WithLock);
+ void _schedule(OutOfLineExecutor::Task task) noexcept;
+
+ auto _threadsRunning() const {
+ return _stats.threadsStarted.load() - _stats.threadsEnded.loadRelaxed();
+ }
+
+ auto _tasksRunning() const {
+ return _stats.tasksStarted.load() - _stats.tasksEnded.loadRelaxed();
+ }
+
+ auto _tasksLeft() const {
+ return _stats.tasksScheduled.load() - _stats.tasksEnded.loadRelaxed();
+ }
+
+ struct Stats {
+ AtomicWord<size_t> threadsStarted{0};
+ AtomicWord<size_t> threadsEnded{0};
+
+ AtomicWord<size_t> tasksScheduled{0};
+ AtomicWord<size_t> tasksStarted{0};
+ AtomicWord<size_t> tasksEnded{0};
+ };
+ Stats _stats;
+
+ ServiceContext* const _svcCtx;
mutable Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ServiceExecutorFixed::_mutex");
stdx::condition_variable _shutdownCondition;
+ SharedPromise<void> _shutdownComplete;
/**
- * State transition diagram: kNotStarted ---> kRunning ---> kStopped
- * The service executor cannot be in "kRunning" when its destructor is invoked.
+ * State transition diagram: kNotStarted ---> kRunning ---> kStopping ---> kStopped
*/
- enum State { kNotStarted, kRunning, kStopped } _state = kNotStarted;
+ enum State { kNotStarted, kRunning, kStopping, kStopped } _state = kNotStarted;
ThreadPool::Options _options;
- std::unique_ptr<ThreadPool> _threadPool;
+ std::shared_ptr<ThreadPool> _threadPool;
+
+ struct Waiter {
+ SessionHandle session;
+ OutOfLineExecutor::Task onCompletionCallback;
+ };
+ using WaiterList = std::list<Waiter>;
+ WaiterList _waiters;
- static inline thread_local std::unique_ptr<ExecutorThreadContext> _executorContext;
+ static thread_local std::unique_ptr<ExecutorThreadContext> _executorContext;
};
} // namespace transport
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 4af83003930..be85fb1fb43 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/service_context_test_fixture.h"
#include "mongo/logv2/log.h"
#include "mongo/transport/mock_session.h"
#include "mongo/transport/service_executor_fixed.h"
@@ -168,7 +169,7 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) {
scheduleBasicTask(executor.get(), false);
}
-class ServiceExecutorFixedFixture : public unittest::Test {
+class ServiceExecutorFixedFixture : public ServiceContextTest {
public:
static constexpr auto kNumExecutorThreads = 2;
@@ -183,10 +184,9 @@ public:
ServiceExecutorHandle(ServiceExecutorHandle&&) = delete;
explicit ServiceExecutorHandle(int flags = kNone) : _skipShutdown(flags & kSkipShutdown) {
- ThreadPool::Options options;
- options.minThreads = options.maxThreads = kNumExecutorThreads;
- options.poolName = "Test";
- _executor = std::make_shared<ServiceExecutorFixed>(std::move(options));
+ ThreadPool::Limits limits;
+ limits.minThreads = limits.maxThreads = kNumExecutorThreads;
+ _executor = std::make_shared<ServiceExecutorFixed>(std::move(limits));
if (flags & kStartExecutor) {
ASSERT_OK(_executor->start());
@@ -331,24 +331,42 @@ TEST_F(ServiceExecutorFixedFixture, Stats) {
returnBarrier->countDownAndWait();
}
-TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
+
+TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) {
ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
- std::unique_ptr<stdx::thread> schedulerThread;
+ auto thread = stdx::thread();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
{
- // Spawn a thread to schedule a task, and block it before it can schedule the task with the
- // underlying thread-pool. Then shutdown the service executor and unblock the scheduler
- // thread. This order of events must cause "schedule()" to return a non-okay status.
FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask");
- schedulerThread = std::make_unique<stdx::thread>([executor = *executorHandle] {
- ASSERT_NOT_OK(
- executor->scheduleTask([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags));
+
+
+ // The executor accepts the work, but hasn't used the underlying pool yet.
+ thread = stdx::thread([&] {
+ ASSERT_OK(executorHandle->scheduleTask([&, barrier] { barrier->countDownAndWait(); },
+ ServiceExecutor::kEmptyFlags));
});
failpoint->waitForTimesEntered(1);
- ASSERT_OK(executorHandle->shutdown(kShutdownTime));
+
+ // Trigger an immediate shutdown which will not affect the task we have accepted.
+ ASSERT_NOT_OK(executorHandle->shutdown(Milliseconds{0}));
}
- schedulerThread->join();
+ // Our failpoint has been disabled, so the task can run to completion.
+ barrier->countDownAndWait();
+
+ // Now we can wait for the task to finish and shutdown.
+ ASSERT_OK(executorHandle->shutdown(kShutdownTime));
+
+ thread.join();
+}
+
+TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
+ ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor);
+
+ ASSERT_OK(executorHandle->shutdown(kShutdownTime));
+ ASSERT_NOT_OK(
+ executorHandle->scheduleTask([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags));
}
TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) {