summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2015-11-09 16:42:46 -0500
committerAndy Schwerin <schwerin@mongodb.com>2015-11-09 16:42:46 -0500
commitfa038b8375fd5aea4d359fe0968beb948de58782 (patch)
tree498d82b60295d621e92b7f5f1197349c2532991a
parentba231028314553f1495e6583d5a59c55919a764c (diff)
downloadmongo-fa038b8375fd5aea4d359fe0968beb948de58782.tar.gz
SERVER-20944 Move as much work as possible outside of ThreadPoolTaskExecutor critical sections.
This patch moves calls to the NetworkInterface and heap allocations out from under the ThreadPoolTaskExecutor's mutex, to minimize critical length and maximize available concurrency.
-rw-r--r--src/mongo/executor/task_executor.h13
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp101
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h26
3 files changed, 91 insertions, 49 deletions
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index f11d66653e5..a52f5003a0b 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -238,19 +238,20 @@ public:
virtual void appendConnectionStats(BSONObjBuilder* b) = 0;
protected:
- TaskExecutor();
-
// Retrieves the Callback from a given CallbackHandle
- CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle);
+ static CallbackState* getCallbackFromHandle(const CallbackHandle& cbHandle);
// Retrieves the Event from a given EventHandle
- EventState* getEventFromHandle(const EventHandle& eventHandle);
+ static EventState* getEventFromHandle(const EventHandle& eventHandle);
// Sets the given CallbackHandle to point to the given callback.
- void setCallbackForHandle(CallbackHandle* cbHandle, std::shared_ptr<CallbackState> callback);
+ static void setCallbackForHandle(CallbackHandle* cbHandle,
+ std::shared_ptr<CallbackState> callback);
// Sets the given EventHandle to point to the given event.
- void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event);
+ static void setEventForHandle(EventHandle* eventHandle, std::shared_ptr<EventState> event);
+
+ TaskExecutor();
};
/**
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 4647273bcf9..75628f86483 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -49,8 +49,8 @@ class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState
MONGO_DISALLOW_COPYING(CallbackState);
public:
- static std::shared_ptr<CallbackState> make(CallbackFn cb,
- EventHandle finishedEvent,
+ static std::shared_ptr<CallbackState> make(CallbackFn&& cb,
+ EventHandle&& finishedEvent,
Date_t readyDate) {
return std::make_shared<CallbackState>(std::move(cb), std::move(finishedEvent), readyDate);
}
@@ -58,7 +58,7 @@ public:
/**
* Do not call directly. Use make.
*/
- CallbackState(CallbackFn cb, EventHandle theFinishedEvent, Date_t theReadyDate)
+ CallbackState(CallbackFn&& cb, EventHandle&& theFinishedEvent, Date_t theReadyDate)
: callback(std::move(cb)),
finishedEvent(std::move(theFinishedEvent)),
readyDate(theReadyDate) {}
@@ -117,6 +117,11 @@ public:
WorkQueue waiters;
};
+struct ThreadPoolTaskExecutor::WQEL {
+ WorkQueue workQueue;
+ EventList eventList;
+};
+
ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterface> pool,
std::unique_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
@@ -180,8 +185,15 @@ Date_t ThreadPoolTaskExecutor::now() {
}
StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() {
+ auto el = makeSingletonEventList();
+ EventHandle event;
+ setEventForHandle(&event, el.front());
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return makeEvent_inlock();
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
+ }
+ _unsignaledEvents.splice(_unsignaledEvents.end(), el);
+ return event;
}
void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
@@ -191,12 +203,13 @@ void ThreadPoolTaskExecutor::signalEvent(const EventHandle& event) {
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::onEvent(const EventHandle& event,
const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!event.isValid()) {
return {ErrorCodes::BadValue, "Passed invalid event handle to onEvent"};
}
+ auto wqel = makeSingletonWork(work);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto eventState = checked_cast<EventState*>(getEventFromHandle(event));
- auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, work);
+ auto cbHandle = enqueueCallbackState_inlock(&eventState->waiters, std::move(wqel));
if (!cbHandle.isOK()) {
return cbHandle;
}
@@ -217,9 +230,10 @@ void ThreadPoolTaskExecutor::waitForEvent(const EventHandle& event) {
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWork(
const CallbackFn& work) {
+ auto wqel = makeSingletonWork(work);
WorkQueue temp;
stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto cbHandle = enqueueCallbackState_inlock(&temp, work);
+ auto cbHandle = enqueueCallbackState_inlock(&temp, std::move(wqel));
if (!cbHandle.isOK()) {
return cbHandle;
}
@@ -232,11 +246,13 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
if (when <= now()) {
return scheduleWork(work);
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, work, when);
+ auto wqel = makeSingletonWork(work, when);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ auto cbHandle = enqueueCallbackState_inlock(&_sleepersQueue, std::move(wqel));
if (!cbHandle.isOK()) {
return cbHandle;
}
+ lk.unlock();
_net->setAlarm(when,
[this, when, cbHandle] {
auto cbState =
@@ -285,32 +301,35 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
} else {
scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout;
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto cbHandle =
- enqueueCallbackState_inlock(&_networkInProgressQueue,
- [scheduledRequest, cb](const CallbackArgs& cbData) {
- remoteCommandFailedEarly(cbData, cb, scheduledRequest);
- });
+ auto wqel = makeSingletonWork([scheduledRequest, cb](const CallbackArgs& cbData) {
+ remoteCommandFailedEarly(cbData, cb, scheduledRequest);
+ });
+ wqel.workQueue.front()->isNetworkOperation = true;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ auto cbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, std::move(wqel));
if (!cbHandle.isOK())
return cbHandle;
- const auto& cbState = _networkInProgressQueue.back();
- cbState->isNetworkOperation = true;
+ const auto cbState = _networkInProgressQueue.back();
LOG(4) << "Scheduling remote command request: " << scheduledRequest.toString();
+ lk.unlock();
_net->startCommand(cbHandle.getValue(),
scheduledRequest,
[this, scheduledRequest, cbState, cb](const ResponseStatus& response) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ using std::swap;
+ CallbackFn newCb =
+ [cb, scheduledRequest, response](const CallbackArgs& cbData) {
+ remoteCommandFinished(cbData, cb, scheduledRequest, response);
+ };
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_inShutdown) {
return;
}
LOG(3) << "Received remote response: "
<< (response.isOK() ? response.getValue().toString()
: response.getStatus().toString());
- cbState->callback =
- [cb, scheduledRequest, response](const CallbackArgs& cbData) {
- remoteCommandFinished(cbData, cb, scheduledRequest, response);
- };
+ swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter);
+ lk.unlock(); // Lets newCb's destructor run outside _mutex.
});
return cbHandle;
}
@@ -355,27 +374,37 @@ void ThreadPoolTaskExecutor::cancelAllCommands() {
}
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
- WorkQueue* queue, CallbackFn work, Date_t when) {
- auto event = makeEvent_inlock();
- if (!event.isOK()) {
- return event.getStatus();
+ WorkQueue* queue, WQEL&& wqel) {
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
- queue->emplace_back(CallbackState::make(std::move(work), std::move(event.getValue()), when));
- queue->back()->iter = std::prev(queue->end());
+ invariant(!wqel.eventList.empty());
+ _unsignaledEvents.splice(_unsignaledEvents.end(), wqel.eventList, wqel.eventList.begin());
+ invariant(wqel.eventList.empty());
+ invariant(!wqel.workQueue.empty());
+ queue->splice(queue->end(), wqel.workQueue, wqel.workQueue.begin());
+ invariant(wqel.workQueue.empty());
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, queue->back());
return cbHandle;
}
-StatusWith<ThreadPoolTaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent_inlock() {
- if (_inShutdown) {
- return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
- }
- _unsignaledEvents.emplace_front(EventState::make());
- _unsignaledEvents.front()->iter = _unsignaledEvents.begin();
+ThreadPoolTaskExecutor::WQEL ThreadPoolTaskExecutor::makeSingletonWork(CallbackFn work,
+ Date_t when) {
+ WQEL result;
+ result.eventList = makeSingletonEventList();
EventHandle event;
- setEventForHandle(&event, _unsignaledEvents.front());
- return event;
+ setEventForHandle(&event, result.eventList.front());
+ result.workQueue.emplace_front(CallbackState::make(std::move(work), std::move(event), when));
+ result.workQueue.front()->iter = result.workQueue.begin();
+ return result;
+}
+
+ThreadPoolTaskExecutor::EventList ThreadPoolTaskExecutor::makeSingletonEventList() {
+ EventList result;
+ result.emplace_front(EventState::make());
+ result.front()->iter = result.begin();
+ return result;
}
void ThreadPoolTaskExecutor::signalEvent_inlock(const EventHandle& event) {
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index e983c91de49..071d616aeca 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -94,17 +94,29 @@ private:
using EventList = stdx::list<std::shared_ptr<EventState>>;
/**
- * Creates a new callback on "queue" with the "work" function. If "when" is
- * not Date_t{}, the new callback's readyDate is set to "when".
+ * Structure returned by makeSingletonWork, for passing into enqueueCallbackState_inlock.
*/
- StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue,
- CallbackFn work,
- Date_t when = {});
+ struct WQEL;
/**
- * Makes a new event object.
+ * Returns an EventList containing one unsignaled EventState. This is a helper function for
+ * performing allocations outside of _mutex, and should only be called by makeSingletonWork and
+ * makeEvent().
*/
- StatusWith<EventHandle> makeEvent_inlock();
+ static EventList makeSingletonEventList();
+
+ /**
+ * Returns an object suitable for passing to enqueueCallbackState_inlock that represents
+ * executing "work" no sooner than "when" (defaults to ASAP). This function may and should be
+ * called outside of _mutex.
+ */
+ static WQEL makeSingletonWork(CallbackFn work, Date_t when = {});
+
+ /**
+ * Creates a new callback on "queue" to do the work described by "wqel", which was
+ * itself produced via makeSingletonWork().
+ */
+ StatusWith<CallbackHandle> enqueueCallbackState_inlock(WorkQueue* queue, WQEL&& wqel);
/**
* Signals the given event.