summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp81
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h13
2 files changed, 43 insertions, 51 deletions
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 75628f86483..03521017846 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -32,6 +32,7 @@
#include "mongo/executor/thread_pool_task_executor.h"
+#include <boost/optional.hpp>
#include <iterator>
#include "mongo/base/checked_cast.h"
@@ -49,19 +50,15 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
MONGO_DISALLOW_COPYING(CallbackState);
public:
- static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
- EventHandle&& finishedEvent,
- Date_t readyDate) {
- return std::make_shared<CallbackState>(std::move(cb), std::move(finishedEvent), readyDate);
+ static std::shared_ptr<CallbackState> make(CallbackFn&& cb, Date_t readyDate) {
+ return std::make_shared<CallbackState>(std::move(cb), readyDate);
}
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn&& cb, EventHandle&& theFinishedEvent, Date_t theReadyDate)
- : callback(std::move(cb)),
- finishedEvent(std::move(theFinishedEvent)),
- readyDate(theReadyDate) {}
+ CallbackState(CallbackFn&& cb, Date_t theReadyDate)
+ : callback(std::move(cb)), readyDate(theReadyDate) {}
virtual ~CallbackState() = default;
@@ -82,11 +79,12 @@ public:
// _mutex.
CallbackFn callback;
- EventHandle finishedEvent;
AtomicUInt32 canceled{0U};
WorkQueue::iterator iter;
Date_t readyDate;
bool isNetworkOperation = false;
+ AtomicWord<bool> isFinished{false};
+ boost::optional<stdx::condition_variable> finishedCondition;
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -117,11 +115,6 @@ public:
WorkQueue waiters;
};
-struct ThreadPoolTaskExecutor::WQEL {
- WorkQueue workQueue;
- EventList eventList;
-};
-
ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
std::unique_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
@@ -206,10 +199,10 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const E
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
- auto wqel = makeSingletonWork(work);
+ auto wq = makeSingletonWorkQueue(work);
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
- auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, std::move(wqel));
+ auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
}
@@ -230,10 +223,10 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
const CallbackFn& work) {
- auto wqel = makeSingletonWork(work);
+ auto wq = makeSingletonWorkQueue(work);
WorkQueue temp;
stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto cbHandle = enqueueCallbackState_inlock(&temp, std::move(wqel));
+ auto cbHandle = enqueueCallbackState_inlock(&temp, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
}
@@ -246,9 +239,9 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
if (when <= now()) {
return scheduleWork(work);
}
- auto wqel = makeSingletonWork(work, when);
+ auto wq = makeSingletonWorkQueue(work, when);
stdx::unique_lock<stdx::mutex> lk(_mutex);
- auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, std::move(wqel));
+ auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, &wq);
if (!cbHandle.isOK()) {
return cbHandle;
}
@@ -301,12 +294,12 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
} else {
scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout;
}
- auto wqel = makeSingletonWork([scheduledRequest, cb](const CallbackArgs& cbData) {
+ auto wq = makeSingletonWorkQueue([scheduledRequest, cb](const CallbackArgs& cbData) {
remoteCommandFailedEarly(cbData, cb, scheduledRequest);
});
- wqel.workQueue.front()->isNetworkOperation = true;
+ wq.front()->isNetworkOperation = true;
stdx::unique_lock<stdx::mutex> lk(_mutex);
- auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, std::move(wqel));
+ auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
if (!cbHandle.isOK())
return cbHandle;
const auto cbState = _networkInProgressQueue.back();
@@ -362,7 +355,16 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
void ThreadPoolTaskExecutor::wait(const CallbackHandle& cbHandle) {
invariant(cbHandle.isValid());
auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
- waitForEvent(cbState->finishedEvent);
+ if (cbState->isFinished.load()) {
+ return;
+ }
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!cbState->finishedCondition) {
+ cbState->finishedCondition.emplace();
+ }
+ while (!cbState->isFinished.load()) {
+ cbState->finishedCondition->wait(lk);
+ }
}
void ThreadPoolTaskExecutor::appendConnectionStats(BSONObjBuilder* b) {
@@ -374,29 +376,23 @@ void ThreadPoolTaskExecutor::cancelAllCommands() {
}
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
- WorkQueue* queue, WQEL&& wqel) {
+ WorkQueue* queue, WorkQueue* wq) {
if (_inShutdown) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
- invariant(!wqel.eventList.empty());
- _unsignaledEvents.splice(_unsignaledEvents.end(), wqel.eventList, wqel.eventList.begin());
- invariant(wqel.eventList.empty());
- invariant(!wqel.workQueue.empty());
- queue->splice(queue->end(), wqel.workQueue, wqel.workQueue.begin());
- invariant(wqel.workQueue.empty());
+ invariant(!wq->empty());
+ queue->splice(queue->end(), *wq, wq->begin());
+ invariant(wq->empty());
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, queue->back());
return cbHandle;
}
-ThreadPoolTaskExecutor::WQEL ThreadPoolTaskExecutor::makeSingletonWork(CallbackFn work,
- Date_t when) {
- WQEL result;
- result.eventList = makeSingletonEventList();
- EventHandle event;
- setEventForHandle(&event, result.eventList.front());
- result.workQueue.emplace_front(CallbackState::make(std::move(work), std::move(event), when));
- result.workQueue.front()->iter = result.workQueue.begin();
+ThreadPoolTaskExecutor::WorkQueue ThreadPoolTaskExecutor::makeSingletonWorkQueue(CallbackFn work,
+ Date_t when) {
+ WorkQueue result;
+ result.emplace_front(CallbackState::make(std::move(work), when));
+ result.front()->iter = result.begin();
return result;
}
@@ -450,11 +446,12 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
: Status::OK());
cbStatePtr->callback(std::move(args));
+ cbStatePtr->isFinished.store(true);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (cbStatePtr->finishedEvent.isValid()) {
- signalEvent_inlock(cbStatePtr->finishedEvent);
- }
_poolInProgressQueue.erase(cbStatePtr->iter);
+ if (cbStatePtr->finishedCondition) {
+ cbStatePtr->finishedCondition->notify_all();
+ }
}
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 071d616aeca..926aec337b3 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -94,11 +94,6 @@ private:
using EventList = stdx::list<std::shared_ptr<EventState>>;
/**
- * Structure returned by makeSingletonWork, for passing into enqueueCallbackState_inlock.
- */
- struct WQEL;
-
- /**
* Returns an EventList containing one unsignaled EventState. This is a helper function for
* performing allocations outside of _mutex, and should only be called by makeSingletonWork and
* makeEvent().
@@ -110,13 +105,13 @@ private:
* executing "work" no sooner than "when" (defaults to ASAP). This function may and should be
* called outside of _mutex.
*/
- static WQEL makeSingletonWork(CallbackFn work, Date_t when = {});
+ static WorkQueue makeSingletonWorkQueue(CallbackFn work, Date_t when = {});
/**
- * Creates a new callback on "queue" to do the work described by "wqel", which was
- * itself produced via makeSingletonWork().
+ * Moves the single callback in "wq" to the end of "queue". It is required that "wq" was
+ * produced via a call to makeSingletonWorkQueue().
*/
- StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue, WQEL&& wqel);
+ StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue, WorkQueue* wq);
/**
* Signals the given event.