summaryrefslogtreecommitdiff
path: root/src/mongo/executor/thread_pool_task_executor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor/thread_pool_task_executor.cpp')
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp59
1 files changed, 37 insertions, 22 deletions
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 202de888b5a..7d42893edb2 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -43,6 +43,7 @@
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/transport/baton.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -59,15 +60,17 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
MONGO_DISALLOW_COPYING(CallbackState);
public:
- static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) {
- return std::make_shared<CallbackState>(std::move(cb), readyDate);
+ static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
+ Date_t readyDate,
+ const transport::BatonHandle& baton) {
+ return std::make_shared<CallbackState>(std::move(cb), readyDate, baton);
}
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn&& cb, Date_t theReadyDate)
- : callback(std::move(cb)), readyDate(theReadyDate) {}
+ CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton)
+ : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {}
virtual ~CallbackState() = default;
@@ -94,6 +97,7 @@ public:
bool isNetworkOperation = false;
AtomicWord<bool> isFinished{false};
boost::optional<stdx::condition_variable> finishedCondition;
+ transport::BatonHandle baton;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -125,7 +129,7 @@ public:
};
ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
- std::unique_ptr<NetworkInterface> net)
+ std::shared_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
@@ -272,7 +276,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
- auto wq = makeSingletonWorkQueue(work);
+ auto wq = makeSingletonWorkQueue(work, nullptr);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
@@ -319,7 +323,7 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
const CallbackFn& work) {
- auto wq = makeSingletonWorkQueue(work);
+ auto wq = makeSingletonWorkQueue(work, nullptr);
WorkQueue temp;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
@@ -335,7 +339,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
if (when <= now()) {
return scheduleWork(work);
}
- auto wq = makeSingletonWorkQueue(work, when);
+ auto wq = makeSingletonWorkQueue(work, nullptr, when);
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
@@ -354,7 +358,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
return;
}
scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
- })
+ },
+ nullptr)
.transitional_ignore();
return cbHandle;
@@ -389,7 +394,9 @@ void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData,
} // namespace
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand(
- const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton) {
RemoteCommandRequest scheduledRequest = request;
if (request.timeout == RemoteCommandRequest::kNoTimeout) {
scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
@@ -399,9 +406,11 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
// In case the request fails to even get a connection from the pool,
// we wrap the callback in a method that prepares its input parameters.
- auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) {
- remoteCommandFailedEarly(cbData, cb, scheduledRequest);
- });
+ auto wq = makeSingletonWorkQueue(
+ [scheduledRequest, cb](const CallbackArgs& cbData) {
+ remoteCommandFailedEarly(cbData, cb, scheduledRequest);
+ },
+ baton);
wq.front()->isNetworkOperation = true;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
@@ -427,7 +436,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
: response.status.toString());
swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
- })
+ },
+ baton)
.transitional_ignore();
return cbHandle;
}
@@ -442,7 +452,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
cbState->canceled.store(1);
if (cbState->isNetworkOperation) {
lk.unlock();
- _net->cancelCommand(cbHandle);
+ _net->cancelCommand(cbHandle, cbState->baton);
return;
}
if (cbState->readyDate != Date_t{}) {
@@ -492,10 +502,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback
return cbHandle;
}
-ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work,
- Date_t when) {
+ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(
+ CallbackFn work, const transport::BatonHandle& baton, Date_t when) {
WorkQueue result;
- result.emplace_front(CallbackState::make(std::move(work), when));
+ result.emplace_front(CallbackState::make(std::move(work), when, baton));
result.front()->iter = result.begin();
return result;
}
@@ -547,10 +557,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
}
for (const auto& cbState : todo) {
- const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
- if (status == ErrorCodes::ShutdownInProgress)
- break;
- fassert(28735, status);
+ if (cbState->baton) {
+ cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ } else {
+ const auto status =
+ _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
+ }
}
_net->signalWorkAvailable();
}