summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-03-05 13:40:25 -0500
committerBen Caimano <ben.caimano@10gen.com>2019-04-05 14:28:40 -0400
commita08bac3e29cd9ec3d030623894928e80c2f572dd (patch)
treea82304a07e67909a8d8a40ec08959f4f1c8ccace /src/mongo/executor
parentb1aa248f71ffb197c6576fd90ff7571ee9a96c3f (diff)
downloadmongo-a08bac3e29cd9ec3d030623894928e80c2f572dd.tar.gz
SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp9
-rw-r--r--src/mongo/executor/network_interface_thread_pool.cpp11
-rw-r--r--src/mongo/executor/network_interface_thread_pool.h2
-rw-r--r--src/mongo/executor/network_interface_tl.cpp16
-rw-r--r--src/mongo/executor/network_interface_tl.h2
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp125
-rw-r--r--src/mongo/executor/thread_pool_mock.h6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp28
8 files changed, 112 insertions, 87 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index c0c4293e505..41e4e602c65 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -646,8 +646,9 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// If the host and port were dropped, let this lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
}
- spawnConnections(lk);
+
return;
}
@@ -671,6 +672,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
+ // TODO This should be scheduled on an executor once we have executor-aware pooling
+ fulfillRequests(lk);
}
updateStateInLock();
@@ -706,8 +709,6 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
returnConnection(connPtr, std::move(lk));
}));
-
- fulfillRequests(lk);
}
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
@@ -860,8 +861,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// If the host and port was dropped, let the connection lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
}
- spawnConnections(lk);
} else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
// If we've exceeded the time limit, restart the connect, rather than
// failing all operations. We do this because the various callers
diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp
index 0fa5ed7632e..787bd0a6dac 100644
--- a/src/mongo/executor/network_interface_thread_pool.cpp
+++ b/src/mongo/executor/network_interface_thread_pool.cpp
@@ -105,17 +105,18 @@ void NetworkInterfaceThreadPool::join() {
lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); });
}
-Status NetworkInterfaceThreadPool::schedule(Task task) {
+void NetworkInterfaceThreadPool::schedule(Task task) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ lk.unlock();
+ task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ return;
}
+
_tasks.emplace_back(std::move(task));
if (_started)
_consumeTasks(std::move(lk));
-
- return Status::OK();
}
/**
@@ -162,7 +163,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut
const auto lkGuard = makeGuard([&] { lk.lock(); });
for (auto&& task : tasks) {
- task();
+ task(Status::OK());
}
tasks.clear();
diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h
index 712ad2d6df7..51771393032 100644
--- a/src/mongo/executor/network_interface_thread_pool.h
+++ b/src/mongo/executor/network_interface_thread_pool.h
@@ -57,7 +57,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- Status schedule(Task task) override;
+ void schedule(Task task) override;
private:
void _consumeTasks(stdx::unique_lock<stdx::mutex> lk);
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index eb808735306..a9529d1c825 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
[ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
- return _onAcquireConn(
- state, std::move(future), std::move(*uassertStatusOK(swConn)), baton);
+ auto connHandle = uassertStatusOK(std::move(swConn));
+ return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton);
})
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
@@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
- _reactor->schedule([action = std::move(action)]() { action(Status::OK()); });
+ _reactor->schedule([action = std::move(action)](auto status) { action(status); });
return Status::OK();
}
@@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
}
if (when <= now()) {
- _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); });
+ _reactor->schedule([action = std::move(action)](auto status) { action(status); });
return Status::OK();
}
@@ -569,7 +569,13 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
}
// Fulfill the promise on a reactor thread
- _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); });
+ _reactor->schedule([state](auto status) {
+ if (status.isOK()) {
+ state->promise.emplaceValue();
+ } else {
+ state->promise.setError(status);
+ }
+ });
}
bool NetworkInterfaceTL::onNetworkThread() {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index df634ed3fb4..2ab75f49aaa 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -102,7 +102,7 @@ private:
transport::ReactorHandle reactor;
void operator()(ConnectionPool::ConnectionInterface* ptr) const {
- reactor->dispatch([ ret = returner, ptr ] { ret(ptr); });
+ reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); });
}
};
using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index a2ffbbac2ba..191537cebff 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -44,19 +44,11 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti
ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _net->signalWorkAvailable();
- _net->exitNetwork();
- if (_started) {
- if (_worker.joinable()) {
- lk.unlock();
- _worker.join();
- lk.lock();
- }
- } else {
- consumeTasks(&lk);
- }
- invariant(_tasks.empty());
+ if (_joining)
+ return;
+
+ _shutdown(lk);
+ _join(lk);
}
void ThreadPoolMock::startup() {
@@ -68,74 +60,87 @@ void ThreadPoolMock::startup() {
_worker = stdx::thread([this] {
_options.onCreateThread();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- consumeTasks(&lk);
+
+ LOG(1) << "Starting to consume tasks";
+ while (!_joining) {
+ if (_tasks.empty()) {
+ lk.unlock();
+ _net->waitForWork();
+ lk.lock();
+ continue;
+ }
+
+ _consumeOneTask(lk);
+ }
+ LOG(1) << "Done consuming tasks";
});
}
void ThreadPoolMock::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _net->signalWorkAvailable();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _shutdown(lk);
}
void ThreadPoolMock::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _joining = true;
- if (_started) {
- stdx::thread toJoin = std::move(_worker);
- _net->signalWorkAvailable();
- _net->exitNetwork();
+ _join(lk);
+}
+
+void ThreadPoolMock::schedule(Task task) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
lk.unlock();
- toJoin.join();
- lk.lock();
- invariant(_tasks.empty());
- } else {
- consumeTasks(&lk);
- invariant(_tasks.empty());
+
+ task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ return;
}
+
+ _tasks.emplace_back(std::move(task));
}
-Status ThreadPoolMock::schedule(Task task) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
+ auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
+ if (next + 1 != _tasks.size()) {
+ std::swap(_tasks[next], _tasks.back());
+ }
+ Task fn = std::move(_tasks.back());
+ _tasks.pop_back();
+ lk.unlock();
if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
+ } else {
+ fn(Status::OK());
}
- _tasks.emplace_back(std::move(task));
- return Status::OK();
+ lk.lock();
}
-void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
- using std::swap;
+void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) {
+ LOG(1) << "Shutting down pool";
- LOG(1) << "Starting to consume tasks";
- while (!(_inShutdown && _tasks.empty())) {
- if (_tasks.empty()) {
- lk->unlock();
- _net->waitForWork();
- lk->lock();
- continue;
- }
- auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
- if (next + 1 != _tasks.size()) {
- swap(_tasks[next], _tasks.back());
- }
- Task fn = std::move(_tasks.back());
- _tasks.pop_back();
- lk->unlock();
- fn();
- lk->lock();
- }
- LOG(1) << "Done consuming tasks";
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+}
- invariant(_tasks.empty());
+void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) {
+ LOG(1) << "Joining pool";
+
+ _joining = true;
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
- while (_started && !_joining) {
- lk->unlock();
- _net->waitForWork();
- lk->lock();
+ // Since there is only one worker thread, we need to consume tasks here to potentially
+ // unblock that thread.
+ while (!_tasks.empty()) {
+ _consumeOneTask(lk);
}
- LOG(1) << "Ready to join";
+ if (_started) {
+ lk.unlock();
+ _worker.join();
+ lk.lock();
+ }
+
+ invariant(_tasks.empty());
}
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index 3d510ffa3a9..e3baaa07273 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -70,10 +70,12 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- Status schedule(Task task) override;
+ void schedule(Task task) override;
private:
- void consumeTasks(stdx::unique_lock<stdx::mutex>* lk);
+ void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk);
+ void _shutdown(stdx::unique_lock<stdx::mutex>& lk);
+ void _join(stdx::unique_lock<stdx::mutex>& lk);
// These are the options with which the pool was configured at construction time.
const Options _options;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index fefecf9a04e..9496b88d21f 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -597,7 +597,9 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
- while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) {
+
+ auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); };
+ while (!ErrorCodes::isCancelationError(checkStatus().code())) {
sleepmillis(100);
}
}
@@ -611,16 +613,24 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
cbState->canceled.store(1);
- const auto status =
- _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress);
+ _pool->schedule([this, cbState](auto status) {
+ invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
+
+ runCallback(std::move(cbState));
+ });
});
} else {
- const auto status =
- _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- if (status == ErrorCodes::ShutdownInProgress)
- break;
- fassert(28735, status);
+ _pool->schedule([this, cbState](auto status) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ cbState->canceled.store(1);
+ } else {
+ fassert(28735, status);
+ }
+
+ runCallback(std::move(cbState));
+ });
}
}
_net->signalWorkAvailable();