summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-03 00:53:07 -0400
committerBenety Goh <benety@mongodb.com>2019-04-03 00:53:07 -0400
commit6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd (patch)
treee5e7d931989f7bdacf515e9f53f29d6a3837c6ee /src/mongo/executor
parentddae7b803ed19bf4bc1af1dcf0f8d4e44575736c (diff)
downloadmongo-6c2bd4b1be257ba7b9335e40c2af18ff25b7fcdd.tar.gz
Revert "SERVER-39965 OutOfLineExecutor Tasks are now unique_function(Status)"
This reverts commit 04ea1d46eb6c4c78e19409f120ae2e61f2a35204.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp9
-rw-r--r--src/mongo/executor/network_interface_thread_pool.cpp11
-rw-r--r--src/mongo/executor/network_interface_thread_pool.h2
-rw-r--r--src/mongo/executor/network_interface_tl.cpp16
-rw-r--r--src/mongo/executor/network_interface_tl.h2
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp125
-rw-r--r--src/mongo/executor/thread_pool_mock.h6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp28
8 files changed, 87 insertions, 112 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 41e4e602c65..c0c4293e505 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -646,9 +646,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// If the host and port were dropped, let this lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
- fulfillRequests(lk);
}
-
+ spawnConnections(lk);
return;
}
@@ -672,8 +671,6 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
- // TODO This should be scheduled on an executor once we have executor-aware pooling
- fulfillRequests(lk);
}
updateStateInLock();
@@ -709,6 +706,8 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
returnConnection(connPtr, std::move(lk));
}));
+
+ fulfillRequests(lk);
}
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
@@ -861,8 +860,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// If the host and port was dropped, let the connection lapse
if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
- fulfillRequests(lk);
}
+ spawnConnections(lk);
} else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
// If we've exceeded the time limit, restart the connect, rather than
// failing all operations. We do this because the various callers
diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp
index 787bd0a6dac..0fa5ed7632e 100644
--- a/src/mongo/executor/network_interface_thread_pool.cpp
+++ b/src/mongo/executor/network_interface_thread_pool.cpp
@@ -105,18 +105,17 @@ void NetworkInterfaceThreadPool::join() {
lk, [&] { return _tasks.empty() && (_consumeState == ConsumeState::kNeutral); });
}
-void NetworkInterfaceThreadPool::schedule(Task task) {
+Status NetworkInterfaceThreadPool::schedule(Task task) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_inShutdown) {
- lk.unlock();
- task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
- return;
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
-
_tasks.emplace_back(std::move(task));
if (_started)
_consumeTasks(std::move(lk));
+
+ return Status::OK();
}
/**
@@ -163,7 +162,7 @@ void NetworkInterfaceThreadPool::_consumeTasksInline(stdx::unique_lock<stdx::mut
const auto lkGuard = makeGuard([&] { lk.lock(); });
for (auto&& task : tasks) {
- task(Status::OK());
+ task();
}
tasks.clear();
diff --git a/src/mongo/executor/network_interface_thread_pool.h b/src/mongo/executor/network_interface_thread_pool.h
index 51771393032..712ad2d6df7 100644
--- a/src/mongo/executor/network_interface_thread_pool.h
+++ b/src/mongo/executor/network_interface_thread_pool.h
@@ -57,7 +57,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
+ Status schedule(Task task) override;
private:
void _consumeTasks(stdx::unique_lock<stdx::mutex> lk);
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index a9529d1c825..eb808735306 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -253,8 +253,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
[ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
- auto connHandle = uassertStatusOK(std::move(swConn));
- return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton);
+ return _onAcquireConn(
+ state, std::move(future), std::move(*uassertStatusOK(swConn)), baton);
})
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
@@ -456,7 +456,7 @@ Status NetworkInterfaceTL::schedule(unique_function<void(Status)> action) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
- _reactor->schedule([action = std::move(action)](auto status) { action(status); });
+ _reactor->schedule([action = std::move(action)]() { action(Status::OK()); });
return Status::OK();
}
@@ -468,7 +468,7 @@ Status NetworkInterfaceTL::setAlarm(const TaskExecutor::CallbackHandle& cbHandle
}
if (when <= now()) {
- _reactor->schedule([action = std::move(action)](auto status) { action(status); });
+ _reactor->schedule([action = std::move(action)]()->void { action(Status::OK()); });
return Status::OK();
}
@@ -569,13 +569,7 @@ void NetworkInterfaceTL::_answerAlarm(Status status, std::shared_ptr<AlarmState>
}
// Fulfill the promise on a reactor thread
- _reactor->schedule([state](auto status) {
- if (status.isOK()) {
- state->promise.emplaceValue();
- } else {
- state->promise.setError(status);
- }
- });
+ _reactor->schedule([state = std::move(state)]() { state->promise.emplaceValue(); });
}
bool NetworkInterfaceTL::onNetworkThread() {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 2ab75f49aaa..df634ed3fb4 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -102,7 +102,7 @@ private:
transport::ReactorHandle reactor;
void operator()(ConnectionPool::ConnectionInterface* ptr) const {
- reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); });
+ reactor->dispatch([ ret = returner, ptr ] { ret(ptr); });
}
};
using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index 191537cebff..a2ffbbac2ba 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -44,11 +44,19 @@ ThreadPoolMock::ThreadPoolMock(NetworkInterfaceMock* net, int32_t prngSeed, Opti
ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_joining)
- return;
-
- _shutdown(lk);
- _join(lk);
+ _inShutdown = true;
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
+ if (_started) {
+ if (_worker.joinable()) {
+ lk.unlock();
+ _worker.join();
+ lk.lock();
+ }
+ } else {
+ consumeTasks(&lk);
+ }
+ invariant(_tasks.empty());
}
void ThreadPoolMock::startup() {
@@ -60,87 +68,74 @@ void ThreadPoolMock::startup() {
_worker = stdx::thread([this] {
_options.onCreateThread();
stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- LOG(1) << "Starting to consume tasks";
- while (!_joining) {
- if (_tasks.empty()) {
- lk.unlock();
- _net->waitForWork();
- lk.lock();
- continue;
- }
-
- _consumeOneTask(lk);
- }
- LOG(1) << "Done consuming tasks";
+ consumeTasks(&lk);
});
}
void ThreadPoolMock::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _shutdown(lk);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ _net->signalWorkAvailable();
}
void ThreadPoolMock::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _join(lk);
-}
-
-void ThreadPoolMock::schedule(Task task) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ _joining = true;
+ if (_started) {
+ stdx::thread toJoin = std::move(_worker);
+ _net->signalWorkAvailable();
+ _net->exitNetwork();
lk.unlock();
-
- task({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
- return;
+ toJoin.join();
+ lk.lock();
+ invariant(_tasks.empty());
+ } else {
+ consumeTasks(&lk);
+ invariant(_tasks.empty());
}
-
- _tasks.emplace_back(std::move(task));
}
-void ThreadPoolMock::_consumeOneTask(stdx::unique_lock<stdx::mutex>& lk) {
- auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
- if (next + 1 != _tasks.size()) {
- std::swap(_tasks[next], _tasks.back());
- }
- Task fn = std::move(_tasks.back());
- _tasks.pop_back();
- lk.unlock();
+Status ThreadPoolMock::schedule(Task task) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_inShutdown) {
- fn({ErrorCodes::ShutdownInProgress, "Shutdown in progress"});
- } else {
- fn(Status::OK());
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
- lk.lock();
-}
-
-void ThreadPoolMock::_shutdown(stdx::unique_lock<stdx::mutex>& lk) {
- LOG(1) << "Shutting down pool";
-
- _inShutdown = true;
- _net->signalWorkAvailable();
+ _tasks.emplace_back(std::move(task));
+ return Status::OK();
}
-void ThreadPoolMock::_join(stdx::unique_lock<stdx::mutex>& lk) {
- LOG(1) << "Joining pool";
+void ThreadPoolMock::consumeTasks(stdx::unique_lock<stdx::mutex>* lk) {
+ using std::swap;
- _joining = true;
- _net->signalWorkAvailable();
- _net->exitNetwork();
-
- // Since there is only one worker thread, we need to consume tasks here to potentially
- // unblock that thread.
- while (!_tasks.empty()) {
- _consumeOneTask(lk);
+ LOG(1) << "Starting to consume tasks";
+ while (!(_inShutdown && _tasks.empty())) {
+ if (_tasks.empty()) {
+ lk->unlock();
+ _net->waitForWork();
+ lk->lock();
+ continue;
+ }
+ auto next = static_cast<size_t>(_prng.nextInt64(static_cast<int64_t>(_tasks.size())));
+ if (next + 1 != _tasks.size()) {
+ swap(_tasks[next], _tasks.back());
+ }
+ Task fn = std::move(_tasks.back());
+ _tasks.pop_back();
+ lk->unlock();
+ fn();
+ lk->lock();
}
+ LOG(1) << "Done consuming tasks";
- if (_started) {
- lk.unlock();
- _worker.join();
- lk.lock();
+ invariant(_tasks.empty());
+
+ while (_started && !_joining) {
+ lk->unlock();
+ _net->waitForWork();
+ lk->lock();
}
- invariant(_tasks.empty());
+ LOG(1) << "Ready to join";
}
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_mock.h b/src/mongo/executor/thread_pool_mock.h
index e3baaa07273..3d510ffa3a9 100644
--- a/src/mongo/executor/thread_pool_mock.h
+++ b/src/mongo/executor/thread_pool_mock.h
@@ -70,12 +70,10 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
+ Status schedule(Task task) override;
private:
- void _consumeOneTask(stdx::unique_lock<stdx::mutex>& lk);
- void _shutdown(stdx::unique_lock<stdx::mutex>& lk);
- void _join(stdx::unique_lock<stdx::mutex>& lk);
+ void consumeTasks(stdx::unique_lock<stdx::mutex>* lk);
// These are the options with which the pool was configured at construction time.
const Options _options;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 9496b88d21f..fefecf9a04e 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -597,9 +597,7 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
-
- auto checkStatus = [&] { return _pool->execute([] {}).getNoThrow(); };
- while (!ErrorCodes::isCancelationError(checkStatus().code())) {
+ while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) {
sleepmillis(100);
}
}
@@ -613,24 +611,16 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
cbState->canceled.store(1);
- _pool->schedule([this, cbState](auto status) {
- invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
-
- runCallback(std::move(cbState));
- });
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ invariant(status.isOK() || status == ErrorCodes::ShutdownInProgress);
});
} else {
- _pool->schedule([this, cbState](auto status) {
- if (ErrorCodes::isCancelationError(status.code())) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- cbState->canceled.store(1);
- } else {
- fassert(28735, status);
- }
-
- runCallback(std::move(cbState));
- });
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
}
}
_net->signalWorkAvailable();