diff options
author | Benety Goh <benety@mongodb.com> | 2019-04-03 00:53:07 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2019-04-03 00:53:07 -0400 |
commit | 6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd (patch) | |
tree | e5e7d931989f7bdacf515e9f53f29d6a3837c6ee /src/mongo/executor | |
parent | ddae7b803ed19bf4bc1af1dcf0f8d4e44575736c (diff) | |
download | mongo-6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd.tar.gz |
Revert "SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)"
This reverts commit 04ea1d46eb6c4c78e19409f120ae2e61f2a35204.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 9 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_thread_pool.cpp | 11 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_thread_pool.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.cpp | 125 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 28 |
8 files changed, 87 insertions, 112 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 41e4e602c65..c0c4293e505 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -646,9 +646,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // If the host and port were dropped, let this lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); - fulfillRequests(lk); } - + spawnConnections(lk); return; } @@ -672,8 +671,6 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); - // TODO This should be scheduled on an executor once we have executor-aware pooling - fulfillRequests(lk); } updateStateInLock(); @@ -709,6 +706,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk returnConnection(connPtr, std::move(lk)); })); + + fulfillRequests(lk); } // Sets state to shutdown and kicks off the failure protocol to tank existing connections @@ -861,8 +860,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // If the host and port was dropped, let the connection lapse if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); - fulfillRequests(lk); } + spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp index 787bd0a6dac..0fa5ed7632e 100644 --- a/src/mongo/executor/network_interface_thread_pool.cpp +++ b/src/mongo/executor/network_interface_thread_pool.cpp @@ -105,18 +105,17 @@ void NetworkInterfaceThreadPool::join() { lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); }); } -void NetworkInterfaceThreadPool::schedule(Task task) { +Status NetworkInterfaceThreadPool::schedule(Task task) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_inShutdown) { - lk.unlock(); - task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); - return; + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - _tasks.emplace_back(std::move(task)); if (_started) _consumeTasks(std::move(lk)); + + return Status::OK(); } /** @@ -163,7 +162,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut const auto lkGuard = makeGuard([&] { lk.lock(); }); for (auto&& task : tasks) { - task(Status::OK()); + task(); } tasks.clear(); diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h index 51771393032..712ad2d6df7 100644 --- a/src/mongo/executor/network_interface_thread_pool.h +++ b/src/mongo/executor/network_interface_thread_pool.h @@ -57,7 +57,7 @@ public: void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; + Status schedule(Task task) override; private: void _consumeTasks(stdx::unique_lock<stdx::mutex> lk); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index a9529d1c825..eb808735306 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { - auto connHandle = uassertStatusOK(std::move(swConn)); - return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton); + return _onAcquireConn( + state, std::move(future), std::move(*uassertStatusOK(swConn)), baton); }) .onError([](Status error) -> StatusWith<RemoteCommandResponse> { // The TransportLayer has, for historical reasons returned SocketException for @@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } - _reactor->schedule([action = std::move(action)](auto status) { action(status); }); + _reactor->schedule([action = std::move(action)]() { action(Status::OK()); }); return Status::OK(); } @@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle } if (when <= now()) { - _reactor->schedule([action = std::move(action)](auto status) { action(status); }); + _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); }); return Status::OK(); } @@ -569,13 +569,7 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState> } // Fulfill the promise on a reactor thread - _reactor->schedule([state](auto status) { - if (status.isOK()) { - state->promise.emplaceValue(); - } else { - state->promise.setError(status); - } - }); + _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); }); } bool NetworkInterfaceTL::onNetworkThread() { diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 2ab75f49aaa..df634ed3fb4 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -102,7 +102,7 @@ private: transport::ReactorHandle reactor; void operator()(ConnectionPool::ConnectionInterface* ptr) const { - reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); }); + reactor->dispatch([ ret = returner, ptr ] { ret(ptr); }); } }; using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index 191537cebff..a2ffbbac2ba 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -44,11 +44,19 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_joining) - return; - - _shutdown(lk); - _join(lk); + _inShutdown = true; + _net->signalWorkAvailable(); + _net->exitNetwork(); + if (_started) { + if (_worker.joinable()) { + lk.unlock(); + _worker.join(); + lk.lock(); + } + } else { + consumeTasks(&lk); + } + invariant(_tasks.empty()); } void ThreadPoolMock::startup() { @@ -60,87 +68,74 @@ void ThreadPoolMock::startup() { _worker = stdx::thread([this] { _options.onCreateThread(); stdx::unique_lock<stdx::mutex> lk(_mutex); - - LOG(1) << "Starting to consume tasks"; - while (!_joining) { - if (_tasks.empty()) { - lk.unlock(); - _net->waitForWork(); - lk.lock(); - continue; - } - - _consumeOneTask(lk); - } - LOG(1) << "Done consuming tasks"; + consumeTasks(&lk); }); } void ThreadPoolMock::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _shutdown(lk); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + _net->signalWorkAvailable(); } void ThreadPoolMock::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _join(lk); -} - -void ThreadPoolMock::schedule(Task task) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_inShutdown) { + _joining = true; + if (_started) { + stdx::thread toJoin = std::move(_worker); + _net->signalWorkAvailable(); + _net->exitNetwork(); lk.unlock(); - - task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); - return; + toJoin.join(); + lk.lock(); + invariant(_tasks.empty()); + } else { + consumeTasks(&lk); + invariant(_tasks.empty()); } - - _tasks.emplace_back(std::move(task)); } -void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) { - auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); - if (next + 1 != _tasks.size()) { - std::swap(_tasks[next], _tasks.back()); - } - Task fn = std::move(_tasks.back()); - _tasks.pop_back(); - lk.unlock(); +Status ThreadPoolMock::schedule(Task task) { + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_inShutdown) { - fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"}); - } else { - fn(Status::OK()); + return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - lk.lock(); -} - -void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) { - LOG(1) << "Shutting down pool"; - - _inShutdown = true; - _net->signalWorkAvailable(); + _tasks.emplace_back(std::move(task)); + return Status::OK(); } -void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) { - LOG(1) << "Joining pool"; +void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) { + using std::swap; - _joining = true; - _net->signalWorkAvailable(); - _net->exitNetwork(); - - // Since there is only one worker thread, we need to consume tasks here to potentially - // unblock that thread. - while (!_tasks.empty()) { - _consumeOneTask(lk); + LOG(1) << "Starting to consume tasks"; + while (!(_inShutdown && _tasks.empty())) { + if (_tasks.empty()) { + lk->unlock(); + _net->waitForWork(); + lk->lock(); + continue; + } + auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size()))); + if (next + 1 != _tasks.size()) { + swap(_tasks[next], _tasks.back()); + } + Task fn = std::move(_tasks.back()); + _tasks.pop_back(); + lk->unlock(); + fn(); + lk->lock(); } + LOG(1) << "Done consuming tasks"; - if (_started) { - lk.unlock(); - _worker.join(); - lk.lock(); + invariant(_tasks.empty()); + + while (_started && !_joining) { + lk->unlock(); + _net->waitForWork(); + lk->lock(); } - invariant(_tasks.empty()); + LOG(1) << "Ready to join"; } } // namespace executor diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h index e3baaa07273..3d510ffa3a9 100644 --- a/src/mongo/executor/thread_pool_mock.h +++ b/src/mongo/executor/thread_pool_mock.h @@ -70,12 +70,10 @@ public: void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; + Status schedule(Task task) override; private: - void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk); - void _shutdown(stdx::unique_lock<stdx::mutex>& lk); - void _join(stdx::unique_lock<stdx::mutex>& lk); + void consumeTasks(stdx::unique_lock<stdx::mutex>* lk); // These are the options with which the pool was configured at construction time. const Options _options; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 9496b88d21f..fefecf9a04e 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -597,9 +597,7 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) { scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off); - - auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); }; - while (!ErrorCodes::isCancelationError(checkStatus().code())) { + while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) { sleepmillis(100); } } @@ -613,24 +611,16 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } cbState->canceled.store(1); - _pool->schedule([this, cbState](auto status) { - invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); - - runCallback(std::move(cbState)); - }); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress); }); } else { - _pool->schedule([this, cbState](auto status) { - if (ErrorCodes::isCancelationError(status.code())) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - cbState->canceled.store(1); - } else { - fassert(28735, status); - } - - runCallback(std::move(cbState)); - }); + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); } } _net->signalWorkAvailable(); |