summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/task_runner.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/repl/task_runner.cpp
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/repl/task_runner.cpp')
-rw-r--r--src/mongo/db/repl/task_runner.cpp255
1 files changed, 126 insertions, 129 deletions
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index fc0a594ac83..385a76207cd 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -46,165 +46,162 @@ namespace repl {
namespace {
- /**
- * Runs a single task runner task.
- * Any exceptions thrown by the task will be logged and converted into a
- * next action of kCancel.
- */
- TaskRunner::NextAction runSingleTask(const TaskRunner::Task& task,
- OperationContext* txn,
- const Status& status) {
- try {
- return task(txn, status);
+/**
+ * Runs a single task runner task.
+ * Any exceptions thrown by the task will be logged and converted into a
+ * next action of kCancel.
+ */
+TaskRunner::NextAction runSingleTask(const TaskRunner::Task& task,
+ OperationContext* txn,
+ const Status& status) {
+ try {
+ return task(txn, status);
+ } catch (...) {
+ log() << "Unhandled exception in task runner: " << exceptionToStatus();
+ }
+ return TaskRunner::NextAction::kCancel;
+}
+
+} // namespace
+
+// static
+TaskRunner::Task TaskRunner::makeCancelTask() {
+ return [](OperationContext* txn, const Status& status) { return NextAction::kCancel; };
+}
+
+TaskRunner::TaskRunner(OldThreadPool* threadPool,
+ const CreateOperationContextFn& createOperationContext)
+ : _threadPool(threadPool),
+ _createOperationContext(createOperationContext),
+ _active(false),
+ _cancelRequested(false) {
+ uassert(ErrorCodes::BadValue, "null thread pool", threadPool);
+ uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext);
+}
+
+TaskRunner::~TaskRunner() {
+ try {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!_active) {
+ return;
}
- catch (...) {
- log() << "Unhandled exception in task runner: " << exceptionToStatus();
+ _cancelRequested = true;
+ _condition.notify_all();
+ while (_active) {
+ _condition.wait(lk);
}
- return TaskRunner::NextAction::kCancel;
+ } catch (...) {
+ error() << "unexpected exception destroying task runner: " << exceptionToStatus();
}
+}
-} // namespace
+std::string TaskRunner::getDiagnosticString() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ str::stream output;
+ output << "TaskRunner";
+ output << " scheduled tasks: " << _tasks.size();
+ output << " active: " << _active;
+ output << " cancel requested: " << _cancelRequested;
+ return output;
+}
- // static
- TaskRunner::Task TaskRunner::makeCancelTask() {
- return [](OperationContext* txn, const Status& status) {
- return NextAction::kCancel;
- };
- }
+bool TaskRunner::isActive() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _active;
+}
- TaskRunner::TaskRunner(OldThreadPool* threadPool,
- const CreateOperationContextFn& createOperationContext)
- : _threadPool(threadPool),
- _createOperationContext(createOperationContext),
- _active(false),
- _cancelRequested(false) {
+void TaskRunner::schedule(const Task& task) {
+ invariant(task);
- uassert(ErrorCodes::BadValue, "null thread pool", threadPool);
- uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext);
- }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- TaskRunner::~TaskRunner() {
- try {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (!_active) {
- return;
- }
- _cancelRequested = true;
- _condition.notify_all();
- while (_active) {
- _condition.wait(lk);
- }
- }
- catch (...) {
- error() << "unexpected exception destroying task runner: " << exceptionToStatus();
- }
- }
+ _tasks.push_back(task);
+ _condition.notify_all();
- std::string TaskRunner::getDiagnosticString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "TaskRunner";
- output << " scheduled tasks: " << _tasks.size();
- output << " active: " << _active;
- output << " cancel requested: " << _cancelRequested;
- return output;
+ if (_active) {
+ return;
}
- bool TaskRunner::isActive() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
- }
+ _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this));
- void TaskRunner::schedule(const Task& task) {
- invariant(task);
+ _active = true;
+ _cancelRequested = false;
+}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void TaskRunner::cancel() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _cancelRequested = true;
+ _condition.notify_all();
+}
- _tasks.push_back(task);
- _condition.notify_all();
+void TaskRunner::_runTasks() {
+ std::unique_ptr<OperationContext> txn;
- if (_active) {
- return;
+ while (Task task = _waitForNextTask()) {
+ if (!txn) {
+ txn.reset(_createOperationContext());
}
- _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this));
-
- _active = true;
- _cancelRequested = false;
- }
+ NextAction nextAction = runSingleTask(task, txn.get(), Status::OK());
- void TaskRunner::cancel() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _cancelRequested = true;
- _condition.notify_all();
- }
-
- void TaskRunner::_runTasks() {
- std::unique_ptr<OperationContext> txn;
-
- while (Task task = _waitForNextTask()) {
- if (!txn) {
- txn.reset(_createOperationContext());
- }
-
- NextAction nextAction = runSingleTask(task, txn.get(), Status::OK());
-
- if (nextAction != NextAction::kKeepOperationContext) {
- txn.reset();
- }
-
- if (nextAction == NextAction::kCancel) {
- break;
- }
- // Release thread back to pool after disposing if no scheduled tasks in queue.
- if (nextAction == NextAction::kDisposeOperationContext ||
- nextAction == NextAction::kInvalid) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_tasks.empty()) {
- _finishRunTasks_inlock();
- return;
- }
- }
+ if (nextAction != NextAction::kKeepOperationContext) {
+ txn.reset();
}
- txn.reset();
- std::list<Task> tasks;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- tasks.swap(_tasks);
+ if (nextAction == NextAction::kCancel) {
+ break;
}
-
- // Cancel remaining tasks with a CallbackCanceled status.
- for (auto task : tasks) {
- runSingleTask(task, nullptr, Status(ErrorCodes::CallbackCanceled,
- "this task has been canceled by a previously invoked task"));
+ // Release thread back to pool after disposing if no scheduled tasks in queue.
+ if (nextAction == NextAction::kDisposeOperationContext ||
+ nextAction == NextAction::kInvalid) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_tasks.empty()) {
+ _finishRunTasks_inlock();
+ return;
+ }
}
+ }
+ txn.reset();
+ std::list<Task> tasks;
+ {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _finishRunTasks_inlock();
+ tasks.swap(_tasks);
}
- void TaskRunner::_finishRunTasks_inlock() {
- _active = false;
- _cancelRequested = false;
- _condition.notify_all();
+ // Cancel remaining tasks with a CallbackCanceled status.
+ for (auto task : tasks) {
+ runSingleTask(task,
+ nullptr,
+ Status(ErrorCodes::CallbackCanceled,
+ "this task has been canceled by a previously invoked task"));
}
- TaskRunner::Task TaskRunner::_waitForNextTask() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _finishRunTasks_inlock();
+}
- while (_tasks.empty() && !_cancelRequested) {
- _condition.wait(lk);
- }
+void TaskRunner::_finishRunTasks_inlock() {
+ _active = false;
+ _cancelRequested = false;
+ _condition.notify_all();
+}
- if (_cancelRequested) {
- return Task();
- }
+TaskRunner::Task TaskRunner::_waitForNextTask() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
- Task task = _tasks.front();
- _tasks.pop_front();
- return task;
+ while (_tasks.empty() && !_cancelRequested) {
+ _condition.wait(lk);
}
-} // namespace repl
-} // namespace mongo
+ if (_cancelRequested) {
+ return Task();
+ }
+
+ Task task = _tasks.front();
+ _tasks.pop_front();
+ return task;
+}
+
+} // namespace repl
+} // namespace mongo