diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/repl/task_runner.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/repl/task_runner.cpp')
-rw-r--r-- | src/mongo/db/repl/task_runner.cpp | 255 |
1 files changed, 126 insertions, 129 deletions
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index fc0a594ac83..385a76207cd 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -46,165 +46,162 @@ namespace repl { namespace { - /** - * Runs a single task runner task. - * Any exceptions thrown by the task will be logged and converted into a - * next action of kCancel. - */ - TaskRunner::NextAction runSingleTask(const TaskRunner::Task& task, - OperationContext* txn, - const Status& status) { - try { - return task(txn, status); +/** + * Runs a single task runner task. + * Any exceptions thrown by the task will be logged and converted into a + * next action of kCancel. + */ +TaskRunner::NextAction runSingleTask(const TaskRunner::Task& task, + OperationContext* txn, + const Status& status) { + try { + return task(txn, status); + } catch (...) { + log() << "Unhandled exception in task runner: " << exceptionToStatus(); + } + return TaskRunner::NextAction::kCancel; +} + +} // namespace + +// static +TaskRunner::Task TaskRunner::makeCancelTask() { + return [](OperationContext* txn, const Status& status) { return NextAction::kCancel; }; +} + +TaskRunner::TaskRunner(OldThreadPool* threadPool, + const CreateOperationContextFn& createOperationContext) + : _threadPool(threadPool), + _createOperationContext(createOperationContext), + _active(false), + _cancelRequested(false) { + uassert(ErrorCodes::BadValue, "null thread pool", threadPool); + uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext); +} + +TaskRunner::~TaskRunner() { + try { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_active) { + return; } - catch (...) { - log() << "Unhandled exception in task runner: " << exceptionToStatus(); + _cancelRequested = true; + _condition.notify_all(); + while (_active) { + _condition.wait(lk); } - return TaskRunner::NextAction::kCancel; + } catch (...) { + error() << "unexpected exception destroying task runner: " << exceptionToStatus(); } +} -} // namespace +std::string TaskRunner::getDiagnosticString() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "TaskRunner"; + output << " scheduled tasks: " << _tasks.size(); + output << " active: " << _active; + output << " cancel requested: " << _cancelRequested; + return output; +} - // static - TaskRunner::Task TaskRunner::makeCancelTask() { - return [](OperationContext* txn, const Status& status) { - return NextAction::kCancel; - }; - } +bool TaskRunner::isActive() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _active; +} - TaskRunner::TaskRunner(OldThreadPool* threadPool, - const CreateOperationContextFn& createOperationContext) - : _threadPool(threadPool), - _createOperationContext(createOperationContext), - _active(false), - _cancelRequested(false) { +void TaskRunner::schedule(const Task& task) { + invariant(task); - uassert(ErrorCodes::BadValue, "null thread pool", threadPool); - uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext); - } + stdx::lock_guard<stdx::mutex> lk(_mutex); - TaskRunner::~TaskRunner() { - try { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (!_active) { - return; - } - _cancelRequested = true; - _condition.notify_all(); - while (_active) { - _condition.wait(lk); - } - } - catch (...) { - error() << "unexpected exception destroying task runner: " << exceptionToStatus(); - } - } + _tasks.push_back(task); + _condition.notify_all(); - std::string TaskRunner::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "TaskRunner"; - output << " scheduled tasks: " << _tasks.size(); - output << " active: " << _active; - output << " cancel requested: " << _cancelRequested; - return output; + if (_active) { + return; } - bool TaskRunner::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } + _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this)); - void TaskRunner::schedule(const Task& task) { - invariant(task); + _active = true; + _cancelRequested = false; +} - stdx::lock_guard<stdx::mutex> lk(_mutex); +void TaskRunner::cancel() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _cancelRequested = true; + _condition.notify_all(); +} - _tasks.push_back(task); - _condition.notify_all(); +void TaskRunner::_runTasks() { + std::unique_ptr<OperationContext> txn; - if (_active) { - return; + while (Task task = _waitForNextTask()) { + if (!txn) { + txn.reset(_createOperationContext()); } - _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this)); - - _active = true; - _cancelRequested = false; - } + NextAction nextAction = runSingleTask(task, txn.get(), Status::OK()); - void TaskRunner::cancel() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _cancelRequested = true; - _condition.notify_all(); - } - - void TaskRunner::_runTasks() { - std::unique_ptr<OperationContext> txn; - - while (Task task = _waitForNextTask()) { - if (!txn) { - txn.reset(_createOperationContext()); - } - - NextAction nextAction = runSingleTask(task, txn.get(), Status::OK()); - - if (nextAction != NextAction::kKeepOperationContext) { - txn.reset(); - } - - if (nextAction == NextAction::kCancel) { - break; - } - // Release thread back to pool after disposing if no scheduled tasks in queue. - if (nextAction == NextAction::kDisposeOperationContext || - nextAction == NextAction::kInvalid) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_tasks.empty()) { - _finishRunTasks_inlock(); - return; - } - } + if (nextAction != NextAction::kKeepOperationContext) { + txn.reset(); } - txn.reset(); - std::list<Task> tasks; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - tasks.swap(_tasks); + if (nextAction == NextAction::kCancel) { + break; } - - // Cancel remaining tasks with a CallbackCanceled status. - for (auto task : tasks) { - runSingleTask(task, nullptr, Status(ErrorCodes::CallbackCanceled, - "this task has been canceled by a previously invoked task")); + // Release thread back to pool after disposing if no scheduled tasks in queue. + if (nextAction == NextAction::kDisposeOperationContext || + nextAction == NextAction::kInvalid) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_tasks.empty()) { + _finishRunTasks_inlock(); + return; + } } + } + txn.reset(); + std::list<Task> tasks; + { stdx::lock_guard<stdx::mutex> lk(_mutex); - _finishRunTasks_inlock(); + tasks.swap(_tasks); } - void TaskRunner::_finishRunTasks_inlock() { - _active = false; - _cancelRequested = false; - _condition.notify_all(); + // Cancel remaining tasks with a CallbackCanceled status. + for (auto task : tasks) { + runSingleTask(task, + nullptr, + Status(ErrorCodes::CallbackCanceled, + "this task has been canceled by a previously invoked task")); } - TaskRunner::Task TaskRunner::_waitForNextTask() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _finishRunTasks_inlock(); +} - while (_tasks.empty() && !_cancelRequested) { - _condition.wait(lk); - } +void TaskRunner::_finishRunTasks_inlock() { + _active = false; + _cancelRequested = false; + _condition.notify_all(); +} - if (_cancelRequested) { - return Task(); - } +TaskRunner::Task TaskRunner::_waitForNextTask() { + stdx::unique_lock<stdx::mutex> lk(_mutex); - Task task = _tasks.front(); - _tasks.pop_front(); - return task; + while (_tasks.empty() && !_cancelRequested) { + _condition.wait(lk); } -} // namespace repl -} // namespace mongo + if (_cancelRequested) { + return Task(); + } + + Task task = _tasks.front(); + _tasks.pop_front(); + return task; +} + +} // namespace repl +} // namespace mongo |