diff options
Diffstat (limited to 'src/mongo/executor/thread_pool_task_executor.cpp')
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 59 |
1 files changed, 37 insertions, 22 deletions
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 202de888b5a..7d42893edb2 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -43,6 +43,7 @@ #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface.h" #include "mongo/platform/atomic_word.h" +#include "mongo/transport/baton.h" #include "mongo/util/concurrency/thread_pool_interface.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -59,15 +60,17 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState MONGO_DISALLOW_COPYING(CallbackState); public: - static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) { - return std::make_shared<CallbackState>(std::move(cb), readyDate); + static std::shared_ptr<CallbackState> make(CallbackFn&& cb, + Date_t readyDate, + const transport::BatonHandle& baton) { + return std::make_shared<CallbackState>(std::move(cb), readyDate, baton); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, Date_t theReadyDate) - : callback(std::move(cb)), readyDate(theReadyDate) {} + CallbackState(CallbackFn&& cb, Date_t theReadyDate, const transport::BatonHandle& baton) + : callback(std::move(cb)), readyDate(theReadyDate), baton(baton) {} virtual ~CallbackState() = default; @@ -94,6 +97,7 @@ public: bool isNetworkOperation = false; AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; + transport::BatonHandle baton; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -125,7 +129,7 @@ public: }; ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, - std::unique_ptr<NetworkInterface> net) + std::shared_ptr<NetworkInterface> net) : _net(std::move(net)), _pool(std::move(pool)) {} ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() { @@ -272,7 +276,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wq = makeSingletonWorkQueue(work); + auto wq = makeSingletonWorkQueue(work, nullptr); stdx::unique_lock<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); @@ -319,7 +323,7 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( const CallbackFn& work) { - auto wq = makeSingletonWorkQueue(work); + auto wq = makeSingletonWorkQueue(work, nullptr); WorkQueue temp; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); @@ -335,7 +339,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( if (when <= now()) { return scheduleWork(work); } - auto wq = makeSingletonWorkQueue(work, when); + auto wq = makeSingletonWorkQueue(work, nullptr, when); stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { @@ -354,7 +358,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return; } scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }) + }, + nullptr) .transitional_ignore(); return cbHandle; @@ -389,7 +394,9 @@ void remoteCommandFailedEarly(const TaskExecutor::CallbackArgs& cbData, } // namespace StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteCommand( - const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton) { RemoteCommandRequest scheduledRequest = request; if (request.timeout == RemoteCommandRequest::kNoTimeout) { scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate; @@ -399,9 +406,11 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC // In case the request fails to even get a connection from the pool, // we wrap the callback in a method that prepares its input parameters. - auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) { - remoteCommandFailedEarly(cbData, cb, scheduledRequest); - }); + auto wq = makeSingletonWorkQueue( + [scheduledRequest, cb](const CallbackArgs& cbData) { + remoteCommandFailedEarly(cbData, cb, scheduledRequest); + }, + baton); wq.front()->isNetworkOperation = true; stdx::unique_lock<stdx::mutex> lk(_mutex); auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq); @@ -427,7 +436,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC : response.status.toString()); swap(cbState->callback, newCb); scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); - }) + }, + baton) .transitional_ignore(); return cbHandle; } @@ -442,7 +452,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { cbState->canceled.store(1); if (cbState->isNetworkOperation) { lk.unlock(); - _net->cancelCommand(cbHandle); + _net->cancelCommand(cbHandle, cbState->baton); return; } if (cbState->readyDate != Date_t{}) { @@ -492,10 +502,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallback return cbHandle; } -ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work, - Date_t when) { +ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue( + CallbackFn work, const transport::BatonHandle& baton, Date_t when) { WorkQueue result; - result.emplace_front(CallbackState::make(std::move(work), when)); + result.emplace_front(CallbackState::make(std::move(work), when, baton)); result.front()->iter = result.begin(); return result; } @@ -547,10 +557,15 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, } for (const auto& cbState : todo) { - const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); - if (status == ErrorCodes::ShutdownInProgress) - break; - fassert(28735, status); + if (cbState->baton) { + cbState->baton->schedule([this, cbState] { runCallback(std::move(cbState)); }); + } else { + const auto status = + _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); + } } _net->signalWorkAvailable(); } |