diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-03-05 13:40:25 -0500 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-04-05 14:28:40 -0400 |
commit | a08bac3e29cd9ec3d030623894928e80c2f572dd (patch) | |
tree | a82304a07e67909a8d8a40ec08959f4f1c8ccace /src/mongo/executor | |
parent | b1aa248f71ffb197c6576fd90ff7571ee9a96c3f (diff) | |
download | mongo-a08bac3e29cd9ec3d030623894928e80c2f572dd.tar.gz |
SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 9 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_thread_pool.cpp | 11 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_thread_pool.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.cpp | 125 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 28 |
8 files changed, 112 insertions, 87 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index c0c4293e505..41e4e602c65 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -646,8 +646,9 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // If the host and port were dropped, let this lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); + fulfillRequests(lk); } - spawnConnections(lk); + return; } @@ -671,6 +672,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); + // TODO This should be scheduled on an executor once we have executor-aware pooling + fulfillRequests(lk); } updateStateInLock(); @@ -706,8 +709,6 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk returnConnection(connPtr, std::move(lk)); })); - - fulfillRequests(lk); } // Sets state to shutdown and kicks off the failure protocol to tank existing connections @@ -860,8 +861,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // If the host and port was dropped, let the connection lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); + fulfillRequests(lk); } - spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp index 0fa5ed7632e..787bd0a6dac 100644 --- a/src/mongo/executor/network_interface_thread_pool.cpp +++ b/src/mongo/executor/network_interface_thread_pool.cpp @@ -105,17 +105,18 @@ void NetworkInterfaceThreadPool::join() { lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); }); } -Status NetworkInterfaceThreadPool::schedule(Task task) { +void NetworkInterfaceThreadPool::schedule(Task task) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + lk.unlock(); + task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + return; } + _tasks.emplace_back(std::move(task)); if (_started) _consumeTasks(std::move(lk)); - - return Status::OK(); } /** @@ -162,7 +163,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut const auto lkGuard = makeGuard([&] { lk.lock(); }); for (auto&& task : tasks) { - task(); + task(Status::OK()); } tasks.clear(); diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h index 712ad2d6df7..51771393032 100644 --- a/src/mongo/executor/network_interface_thread_pool.h +++ b/src/mongo/executor/network_interface_thread_pool.h @@ -57,7 +57,7 @@ public: void startup() override; void shutdown() override; void join() override; - Status schedule(Task task) override; + void schedule(Task task) override; private: void _consumeTasks(stdx::unique_lock<stdx::mutex> lk); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index eb808735306..a9529d1c825 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { - return _onAcquireConn( - state, std::move(future), std::move(*uassertStatusOK(swConn)), baton); + auto connHandle = uassertStatusOK(std::move(swConn)); + return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton); }) .onError([](Status error) -> StatusWith<RemoteCommandResponse> { // The TransportLayer has, for historical reasons returned SocketException for @@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } - _reactor->schedule([action = std::move(action)]() { action(Status::OK()); }); + _reactor->schedule([action = std::move(action)](auto status) { action(status); }); return Status::OK(); } @@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle } if (when <= now()) { - _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); }); + _reactor->schedule([action = std::move(action)](auto status) { action(status); }); return Status::OK(); } @@ -569,7 +569,13 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState> } // Fulfill the promise on a reactor thread - _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); }); + _reactor->schedule([state](auto status) { + if (status.isOK()) { + state->promise.emplaceValue(); + } else { + state->promise.setError(status); + } + }); } bool NetworkInterfaceTL::onNetworkThread() { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index df634ed3fb4..2ab75f49aaa 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -102,7 +102,7 @@ private: transport::ReactorHandle reactor; void operator()(ConnectionPool::ConnectionInterface* ptr) const { - reactor->dispatch([ ret = returner, ptr ] { ret(ptr); }); + reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); }); } }; using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index a2ffbbac2ba..191537cebff 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -44,19 +44,11 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; - _net->signalWorkAvailable(); - _net->exitNetwork(); - if (_started) { - if (_worker.joinable()) { - lk.unlock(); - _worker.join(); - lk.lock(); - } - } else { - consumeTasks(&lk); - } - invariant(_tasks.empty()); + if (_joining) + return; + + _shutdown(lk); + _join(lk); } void ThreadPoolMock::startup() { @@ -68,74 +60,87 @@ void ThreadPoolMock::startup() { _worker = stdx::thread([this] { _options.onCreateThread(); stdx::unique_lock<stdx::mutex> lk(_mutex); - consumeTasks(&lk); + + LOG(1) << "Starting to consume tasks"; + while (!_joining) { + if (_tasks.empty()) { + lk.unlock(); + _net->waitForWork(); + lk.lock(); + continue; + } + + _consumeOneTask(lk); + } + LOG(1) << "Done consuming tasks"; }); } void ThreadPoolMock::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inShutdown = true; - _net->signalWorkAvailable(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + _shutdown(lk); } void ThreadPoolMock::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _joining = true; - if (_started) { - stdx::thread toJoin = std::move(_worker); - _net->signalWorkAvailable(); - _net->exitNetwork(); + _join(lk); +} + +void ThreadPoolMock::schedule(Task task) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_inShutdown) { lk.unlock(); - toJoin.join(); - lk.lock(); - invariant(_tasks.empty()); - } else { - consumeTasks(&lk); - invariant(_tasks.empty()); + + task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + return; } + + _tasks.emplace_back(std::move(task)); } -Status ThreadPoolMock::schedule(Task task) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) { + auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); + if (next + 1 != _tasks.size()) { + std::swap(_tasks[next], _tasks.back()); + } + Task fn = std::move(_tasks.back()); + _tasks.pop_back(); + lk.unlock(); if (_inShutdown) { - return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; + fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); + } else { + fn(Status::OK()); } - _tasks.emplace_back(std::move(task)); - return Status::OK(); + lk.lock(); } -void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) { - using std::swap; +void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) { + LOG(1) << "Shutting down pool"; - LOG(1) << "Starting to consume tasks"; - while (!(_inShutdown && _tasks.empty())) { - if (_tasks.empty()) { - lk->unlock(); - _net->waitForWork(); - lk->lock(); - continue; - } - auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); - if (next + 1 != _tasks.size()) { - swap(_tasks[next], _tasks.back()); - } - Task fn = std::move(_tasks.back()); - _tasks.pop_back(); - lk->unlock(); - fn(); - lk->lock(); - } - LOG(1) << "Done consuming tasks"; + _inShutdown = true; + _net->signalWorkAvailable(); +} - invariant(_tasks.empty()); +void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) { + LOG(1) << "Joining pool"; + + _joining = true; + _net->signalWorkAvailable(); + _net->exitNetwork(); - while (_started && !_joining) { - lk->unlock(); - _net->waitForWork(); - lk->lock(); + // Since there is only one worker thread, we need to consume tasks here to potentially + // unblock that thread. + while (!_tasks.empty()) { + _consumeOneTask(lk); } - LOG(1) << "Ready to join"; + if (_started) { + lk.unlock(); + _worker.join(); + lk.lock(); + } + + invariant(_tasks.empty()); } } // namespace executor diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index 3d510ffa3a9..e3baaa07273 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -70,10 +70,12 @@ public: void startup() override; void shutdown() override; void join() override; - Status schedule(Task task) override; + void schedule(Task task) override; private: - void consumeTasks(stdx::unique_lock<stdx::mutex>* lk); + void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk); + void _shutdown(stdx::unique_lock<stdx::mutex>& lk); + void _join(stdx::unique_lock<stdx::mutex>& lk); // These are the options with which the pool was configured at construction time. const Options _options; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index fefecf9a04e..9496b88d21f 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -597,7 +597,9 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) { scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off); - while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) { + + auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); }; + while (!ErrorCodes::isCancelationError(checkStatus().code())) { sleepmillis(100); } } @@ -611,16 +613,24 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } cbState->canceled.store(1); - const auto status = - _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); + _pool->schedule([this, cbState](auto status) { + invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); + + runCallback(std::move(cbState)); + }); }); } else { - const auto status = - _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - if (status == ErrorCodes::ShutdownInProgress) - break; - fassert(28735, status); + _pool->schedule([this, cbState](auto status) { + if (ErrorCodes::isCancelationError(status.code())) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + cbState->canceled.store(1); + } else { + fassert(28735, status); + } + + runCallback(std::move(cbState)); + }); } } _net->signalWorkAvailable(); |