diff options
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 81 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 13 |
2 files changed, 43 insertions, 51 deletions
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 75628f86483..03521017846 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -32,6 +32,7 @@ #include "mongo/executor/thread_pool_task_executor.h" +#include <boost/optional.hpp> #include <iterator> #include "mongo/base/checked_cast.h" @@ -49,19 +50,15 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState MONGO_DISALLOW_COPYING(CallbackState); public: - static std::shared_ptr<CallbackState> make(CallbackFn&& cb, - EventHandle&& finishedEvent, - Date_t readyDate) { - return std::make_shared<CallbackState>(std::move(cb), std::move(finishedEvent), readyDate); + static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) { + return std::make_shared<CallbackState>(std::move(cb), readyDate); } /** * Do not call directly. Use make. */ - CallbackState(CallbackFn&& cb, EventHandle&& theFinishedEvent, Date_t theReadyDate) - : callback(std::move(cb)), - finishedEvent(std::move(theFinishedEvent)), - readyDate(theReadyDate) {} + CallbackState(CallbackFn&& cb, Date_t theReadyDate) + : callback(std::move(cb)), readyDate(theReadyDate) {} virtual ~CallbackState() = default; @@ -82,11 +79,12 @@ public: // _mutex. CallbackFn callback; - EventHandle finishedEvent; AtomicUInt32 canceled{0U}; WorkQueue::iterator iter; Date_t readyDate; bool isNetworkOperation = false; + AtomicWord<bool> isFinished{false}; + boost::optional<stdx::condition_variable> finishedCondition; }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -117,11 +115,6 @@ public: WorkQueue waiters; }; -struct ThreadPoolTaskExecutor::WQEL { - WorkQueue workQueue; - EventList eventList; -}; - ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool, std::unique_ptr<NetworkInterface> net) : _net(std::move(net)), _pool(std::move(pool)) {} @@ -206,10 +199,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E if (!event.isValid()) { return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"}; } - auto wqel = makeSingletonWork(work); + auto wq = makeSingletonWorkQueue(work); stdx::lock_guard<stdx::mutex> lk(_mutex); auto eventState = checked_cast<EventState*>(getEventFromHandle(event)); - auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, std::move(wqel)); + auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq); if (!cbHandle.isOK()) { return cbHandle; } @@ -230,10 +223,10 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) { StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork( const CallbackFn& work) { - auto wqel = makeSingletonWork(work); + auto wq = makeSingletonWorkQueue(work); WorkQueue temp; stdx::lock_guard<stdx::mutex> lk(_mutex); - auto cbHandle = enqueueCallbackState_inlock(&temp, std::move(wqel)); + auto cbHandle = enqueueCallbackState_inlock(&temp, &wq); if (!cbHandle.isOK()) { return cbHandle; } @@ -246,9 +239,9 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( if (when <= now()) { return scheduleWork(work); } - auto wqel = makeSingletonWork(work, when); + auto wq = makeSingletonWorkQueue(work, when); stdx::unique_lock<stdx::mutex> lk(_mutex); - auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, std::move(wqel)); + auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq); if (!cbHandle.isOK()) { return cbHandle; } @@ -301,12 +294,12 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC } else { scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout; } - auto wqel = makeSingletonWork([scheduledRequest, cb](const CallbackArgs& cbData) { + auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) { remoteCommandFailedEarly(cbData, cb, scheduledRequest); }); - wqel.workQueue.front()->isNetworkOperation = true; + wq.front()->isNetworkOperation = true; stdx::unique_lock<stdx::mutex> lk(_mutex); - auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, std::move(wqel)); + auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq); if (!cbHandle.isOK()) return cbHandle; const auto cbState = _networkInProgressQueue.back(); @@ -362,7 +355,16 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) { invariant(cbHandle.isValid()); auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); - waitForEvent(cbState->finishedEvent); + if (cbState->isFinished.load()) { + return; + } + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!cbState->finishedCondition) { + cbState->finishedCondition.emplace(); + } + while (!cbState->isFinished.load()) { + cbState->finishedCondition->wait(lk); + } } void ThreadPoolTaskExecutor::appendConnectionStats(BSONObjBuilder* b) { @@ -374,29 +376,23 @@ void ThreadPoolTaskExecutor::cancelAllCommands() { } StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock( - WorkQueue* queue, WQEL&& wqel) { + WorkQueue* queue, WorkQueue* wq) { if (_inShutdown) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } - invariant(!wqel.eventList.empty()); - _unsignaledEvents.splice(_unsignaledEvents.end(), wqel.eventList, wqel.eventList.begin()); - invariant(wqel.eventList.empty()); - invariant(!wqel.workQueue.empty()); - queue->splice(queue->end(), wqel.workQueue, wqel.workQueue.begin()); - invariant(wqel.workQueue.empty()); + invariant(!wq->empty()); + queue->splice(queue->end(), *wq, wq->begin()); + invariant(wq->empty()); CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, queue->back()); return cbHandle; } -ThreadPoolTaskExecutor::WQEL ThreadPoolTaskExecutor::makeSingletonWork(CallbackFn work, - Date_t when) { - WQEL result; - result.eventList = makeSingletonEventList(); - EventHandle event; - setEventForHandle(&event, result.eventList.front()); - result.workQueue.emplace_front(CallbackState::make(std::move(work), std::move(event), when)); - result.workQueue.front()->iter = result.workQueue.begin(); +ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work, + Date_t when) { + WorkQueue result; + result.emplace_front(CallbackState::make(std::move(work), when)); + result.front()->iter = result.begin(); return result; } @@ -450,11 +446,12 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) : Status::OK()); cbStatePtr->callback(std::move(args)); + cbStatePtr->isFinished.store(true); stdx::lock_guard<stdx::mutex> lk(_mutex); - if (cbStatePtr->finishedEvent.isValid()) { - signalEvent_inlock(cbStatePtr->finishedEvent); - } _poolInProgressQueue.erase(cbStatePtr->iter); + if (cbStatePtr->finishedCondition) { + cbStatePtr->finishedCondition->notify_all(); + } } } // namespace executor diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 071d616aeca..926aec337b3 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -94,11 +94,6 @@ private: using EventList = stdx::list<std::shared_ptr<EventState>>; /** - * Structure returned by makeSingletonWork, for passing into enqueueCallbackState_inlock. - */ - struct WQEL; - - /** * Returns an EventList containing one unsignaled EventState. This is a helper function for * performing allocations outside of _mutex, and should only be called by makeSingletonWork and * makeEvent(). @@ -110,13 +105,13 @@ private: * executing "work" no sooner than "when" (defaults to ASAP). This function may and should be * called outside of _mutex. */ - static WQEL makeSingletonWork(CallbackFn work, Date_t when = {}); + static WorkQueue makeSingletonWorkQueue(CallbackFn work, Date_t when = {}); /** - * Creates a new callback on "queue" to do the work described by "wqel", which was - * itself produced via makeSingletonWork(). + * Moves the single callback in "wq" to the end of "queue". It is required that "wq" was + * produced via a call to makeSingletonWorkQueue(). */ - StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue, WQEL&& wqel); + StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue, WorkQueue* wq); /** * Signals the given event. |