summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-04-22 13:39:45 -0400
committerBenety Goh <benety@mongodb.com>2015-05-06 15:30:50 -0400
commitbbb5f157681ad29b379c0ba99d9d26ad6b297864 (patch)
tree9df94e006328a54200536d42b008c417feab165b
parentdbad6dc87c1016ad8385d0a4d609d85910a96811 (diff)
downloadmongo-bbb5f157681ad29b379c0ba99d9d26ad6b297864.tar.gz
SERVER-18028 added data replication task runner and database task
-rw-r--r--src/mongo/db/repl/SConscript50
-rw-r--r--src/mongo/db/repl/database_task.cpp101
-rw-r--r--src/mongo/db/repl/database_task.h75
-rw-r--r--src/mongo/db/repl/database_task_test.cpp186
-rw-r--r--src/mongo/db/repl/task_runner.cpp210
-rw-r--r--src/mongo/db/repl/task_runner.h170
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp339
-rw-r--r--src/mongo/db/repl/task_runner_test_fixture.cpp107
-rw-r--r--src/mongo/db/repl/task_runner_test_fixture.h85
9 files changed, 1323 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index a8e96c93f40..430a6da8880 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -393,3 +393,53 @@ env.CppUnitTest(
'base_cloner_test_fixture',
],
)
+
+env.Library(
+ target='task_runner',
+ source=[
+ 'task_runner.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ ],
+)
+
+env.Library(
+ target='task_runner_test_fixture',
+ source=[
+ 'task_runner_test_fixture.cpp',
+ ],
+ LIBDEPS=[
+ 'task_runner',
+ '$BUILD_DIR/mongo/util/decorable',
+ ],
+)
+
+env.CppUnitTest(
+ target='task_runner_test',
+ source='task_runner_test.cpp',
+ LIBDEPS=[
+ 'task_runner_test_fixture',
+ ],
+)
+
+env.Library(
+ target='database_task',
+ source=[
+ 'database_task.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
+ ],
+)
+
+env.CppUnitTest(
+ target='database_task_test',
+ source='database_task_test.cpp',
+ LIBDEPS=[
+ 'database_task',
+ 'replmocks',
+ 'task_runner_test_fixture',
+ ],
+)
diff --git a/src/mongo/db/repl/database_task.cpp b/src/mongo/db/repl/database_task.cpp
new file mode 100644
index 00000000000..83a6678e93a
--- /dev/null
+++ b/src/mongo/db/repl/database_task.cpp
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/database_task.h"
+#include "mongo/platform/compiler.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace repl {
+
+ // static
+ DatabaseTask::Task DatabaseTask::makeGlobalExclusiveLockTask(const Task& task) {
+ invariant(task);
+ DatabaseTask::Task newTask = [task](OperationContext* txn, const Status& status) {
+ if (!status.isOK()) {
+ return task(txn, status);
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lock(txn->lockState());
+ return task(txn, status);
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "globalExclusiveLockTask", "global");
+ MONGO_COMPILER_UNREACHABLE;
+ };
+ return newTask;
+ }
+
+ // static
+ DatabaseTask::Task DatabaseTask::makeDatabaseLockTask(const Task& task,
+ const std::string& databaseName,
+ LockMode mode) {
+ invariant(task);
+ DatabaseTask::Task newTask = [=](OperationContext* txn, const Status& status) {
+ if (!status.isOK()) {
+ return task(txn, status);
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ LockMode permissiveLockMode = isSharedLockMode(mode) ? MODE_IS : MODE_IX;
+ ScopedTransaction transaction(txn, permissiveLockMode);
+ Lock::DBLock lock(txn->lockState(), databaseName, mode);
+ return task(txn, status);
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "databaseLockTask", databaseName);
+ MONGO_COMPILER_UNREACHABLE;
+ };
+ return newTask;
+ }
+
+ // static
+ DatabaseTask::Task DatabaseTask::makeCollectionLockTask(const Task& task,
+ const NamespaceString& nss,
+ LockMode mode) {
+ invariant(task);
+ DatabaseTask::Task newTask = [=](OperationContext* txn, const Status& status) {
+ if (!status.isOK()) {
+ return task(txn, status);
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ LockMode permissiveLockMode = isSharedLockMode(mode) ? MODE_IS : MODE_IX;
+ ScopedTransaction transaction(txn, permissiveLockMode);
+ Lock::DBLock lock(txn->lockState(), nss.db(), permissiveLockMode);
+ Lock::CollectionLock collectionLock(txn->lockState(), nss.toString(), mode);
+ return task(txn, status);
+ } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "collectionLockTask", nss.toString());
+ MONGO_COMPILER_UNREACHABLE;
+ };
+ return newTask;
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/database_task.h b/src/mongo/db/repl/database_task.h
new file mode 100644
index 00000000000..5a4f09dfdae
--- /dev/null
+++ b/src/mongo/db/repl/database_task.h
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace repl {
+
+ class DatabaseTask{
+ private:
+ DatabaseTask();
+
+ public:
+
+ using Task = TaskRunner::Task;
+
+ /**
+ * Creates a task wrapper that runs the target task inside a global exclusive lock.
+ */
+ static Task makeGlobalExclusiveLockTask(const Task& task);
+
+ /**
+ * Creates a task wrapper that runs the target task inside a database lock.
+ */
+ static Task makeDatabaseLockTask(const Task& task,
+ const std::string& databaseName,
+ LockMode mode);
+
+ /**
+ * Creates a task wrapper that runs the target task inside a collection lock.
+ * Task acquires database lock before attempting to lock collection. Do not
+ * use in combination with makeDatabaseLockTask().
+ */
+ static Task makeCollectionLockTask(const Task& task,
+ const NamespaceString& nss,
+ LockMode mode);
+
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp
new file mode 100644
index 00000000000..b9a37c9ca73
--- /dev/null
+++ b/src/mongo/db/repl/database_task_test.cpp
@@ -0,0 +1,186 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <boost/thread/lock_types.hpp>
+
+#include "mongo/db/repl/database_task.h"
+#include "mongo/db/repl/operation_context_repl_mock.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/db/repl/task_runner_test_fixture.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace {
+
+ using namespace mongo;
+ using namespace mongo::repl;
+
+ const std::string databaseName = "mydb";
+ const std::string collectionName = "mycoll";
+ const NamespaceString nss(databaseName, collectionName);
+
+ class DatabaseTaskTest : public TaskRunnerTest {
+ public:
+ OperationContext* createOperationContext() const override;
+ };
+
+ OperationContext* DatabaseTaskTest::createOperationContext() const {
+ return new OperationContextReplMock();
+ }
+
+ TEST_F(DatabaseTaskTest, TaskRunnerErrorStatus) {
+ // Should not attempt to acquire lock on error status from task runner.
+ auto task = [](OperationContext* txn, const Status& status) {
+ ASSERT_FALSE(txn);
+ ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
+ return TaskRunner::NextAction::kInvalid;
+ };
+ auto testLockTask = [](DatabaseTask::Task task) {
+ ASSERT_TRUE(TaskRunner::NextAction::kInvalid ==
+ task(nullptr, Status(ErrorCodes::BadValue, "")));
+ };
+ testLockTask(DatabaseTask::makeGlobalExclusiveLockTask(task));
+ testLockTask(DatabaseTask::makeDatabaseLockTask(task, databaseName, MODE_X));
+ testLockTask(DatabaseTask::makeCollectionLockTask(task, nss, MODE_X));
+ }
+
+ TEST_F(DatabaseTaskTest, RunGlobalExclusiveLockTask) {
+ boost::mutex mutex;
+ bool called = false;
+ OperationContext* txn = nullptr;
+ bool lockIsW = false;
+ Status status = getDefaultStatus();
+ // Task returning 'void' implies NextAction::NoAction.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ called = true;
+ txn = theTxn;
+ lockIsW = txn->lockState()->isW();
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(DatabaseTask::makeGlobalExclusiveLockTask(task));
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT(txn);
+ ASSERT_TRUE(lockIsW);
+ ASSERT_OK(status);
+ }
+
+ void _testRunDatabaseLockTask(DatabaseTaskTest& test, LockMode mode) {
+ boost::mutex mutex;
+ bool called = false;
+ OperationContext* txn = nullptr;
+ bool isDatabaseLockedForMode = false;
+ Status status = test.getDefaultStatus();
+ // Task returning 'void' implies NextAction::NoAction.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ called = true;
+ txn = theTxn;
+ isDatabaseLockedForMode = txn->lockState()->isDbLockedForMode(databaseName, mode);
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ test.getTaskRunner().schedule(
+ DatabaseTask::makeDatabaseLockTask(task, databaseName, mode));
+ test.getThreadPool().join();
+ ASSERT_FALSE(test.getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT(txn);
+ ASSERT_TRUE(isDatabaseLockedForMode);
+ ASSERT_OK(status);
+ }
+
+ TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeX) {
+ _testRunDatabaseLockTask(*this, MODE_X);
+ }
+
+ TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeS) {
+ _testRunDatabaseLockTask(*this, MODE_S);
+ }
+
+ TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIX) {
+ _testRunDatabaseLockTask(*this, MODE_IX);
+ }
+
+ TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIS) {
+ _testRunDatabaseLockTask(*this, MODE_IS);
+ }
+
+ void _testRunCollectionLockTask(DatabaseTaskTest& test, LockMode mode) {
+ boost::mutex mutex;
+ bool called = false;
+ OperationContext* txn = nullptr;
+ bool isCollectionLockedForMode = false;
+ Status status = test.getDefaultStatus();
+ // Task returning 'void' implies NextAction::NoAction.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ called = true;
+ txn = theTxn;
+ isCollectionLockedForMode =
+ txn->lockState()->isCollectionLockedForMode(nss.toString(), mode);
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ test.getTaskRunner().schedule(
+ DatabaseTask::makeCollectionLockTask(task, nss, mode));
+ test.getThreadPool().join();
+ ASSERT_FALSE(test.getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT(txn);
+ ASSERT_TRUE(isCollectionLockedForMode);
+ ASSERT_OK(status);
+ }
+
+ TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeX) {
+ _testRunCollectionLockTask(*this, MODE_X);
+ }
+
+ TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeS) {
+ _testRunCollectionLockTask(*this, MODE_S);
+ }
+
+ TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIX) {
+ _testRunCollectionLockTask(*this, MODE_IX);
+ }
+
+ TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIS) {
+ _testRunCollectionLockTask(*this, MODE_IS);
+ }
+
+} // namespace
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
new file mode 100644
index 00000000000..f1b54c295d0
--- /dev/null
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -0,0 +1,210 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/task_runner.h"
+
+#include <boost/thread/locks.hpp>
+#include <memory>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+ /**
+ * Runs a single task runner task.
+ * Any exceptions thrown by the task will be logged and converted into a
+ * next action of kCancel.
+ */
+ TaskRunner::NextAction runSingleTask(const TaskRunner::Task& task,
+ OperationContext* txn,
+ const Status& status) {
+ try {
+ return task(txn, status);
+ }
+ catch (...) {
+ log() << "Unhandled exception in task runner: " << exceptionToStatus();
+ }
+ return TaskRunner::NextAction::kCancel;
+ }
+
+} // namespace
+
+ // static
+ TaskRunner::Task TaskRunner::makeCancelTask() {
+ return [](OperationContext* txn, const Status& status) {
+ return NextAction::kCancel;
+ };
+ }
+
+ TaskRunner::TaskRunner(threadpool::ThreadPool* threadPool,
+ const CreateOperationContextFn& createOperationContext)
+ : _threadPool(threadPool),
+ _createOperationContext(createOperationContext),
+ _active(false),
+ _cancelRequested(false) {
+
+ uassert(ErrorCodes::BadValue, "null thread pool", threadPool);
+ uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext);
+ }
+
+ TaskRunner::~TaskRunner() {
+ try {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ if (!_active) {
+ return;
+ }
+ _cancelRequested = true;
+ _condition.notify_all();
+ while (_active) {
+ _condition.wait(lk);
+ }
+ }
+ catch (...) {
+ error() << "unexpected exception destroying task runner: " << exceptionToStatus();
+ }
+ }
+
+ std::string TaskRunner::getDiagnosticString() const {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ str::stream output;
+ output << "TaskRunner";
+ output << " scheduled tasks: " << _tasks.size();
+ output << " active: " << _active;
+ output << " cancel requested: " << _cancelRequested;
+ return output;
+ }
+
+ bool TaskRunner::isActive() const {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ return _active;
+ }
+
+ void TaskRunner::schedule(const Task& task) {
+ invariant(task);
+
+ boost::lock_guard<boost::mutex> lk(_mutex);
+
+ _tasks.push_back(task);
+ _condition.notify_all();
+
+ if (_active) {
+ return;
+ }
+
+ _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this));
+
+ _active = true;
+ _cancelRequested = false;
+ }
+
+ void TaskRunner::cancel() {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ _cancelRequested = true;
+ _condition.notify_all();
+ }
+
+ void TaskRunner::_runTasks() {
+ std::unique_ptr<OperationContext> txn;
+
+ while (Task task = _waitForNextTask()) {
+ if (!txn) {
+ txn.reset(_createOperationContext());
+ }
+
+ NextAction nextAction = runSingleTask(task, txn.get(), Status::OK());
+
+ if (nextAction != NextAction::kKeepOperationContext) {
+ txn.reset();
+ }
+
+ if (nextAction == NextAction::kCancel) {
+ break;
+ }
+ // Release thread back to pool after disposing if no scheduled tasks in queue.
+ if (nextAction == NextAction::kDisposeOperationContext ||
+ nextAction == NextAction::kInvalid) {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ if (_tasks.empty()) {
+ _finishRunTasks_inlock();
+ return;
+ }
+ }
+ }
+ txn.reset();
+
+ std::list<Task> tasks;
+ {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ tasks.swap(_tasks);
+ }
+
+ // Cancel remaining tasks with a CallbackCanceled status.
+ for (auto task : tasks) {
+ runSingleTask(task, nullptr, Status(ErrorCodes::CallbackCanceled,
+ "this task has been canceled by a previously invoked task"));
+ }
+
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ _finishRunTasks_inlock();
+ }
+
+ void TaskRunner::_finishRunTasks_inlock() {
+ _active = false;
+ _cancelRequested = false;
+ _condition.notify_all();
+ }
+
+ TaskRunner::Task TaskRunner::_waitForNextTask() {
+ boost::unique_lock<boost::mutex> lk(_mutex);
+
+ while (_tasks.empty() && !_cancelRequested) {
+ _condition.wait(lk);
+ }
+
+ if (_cancelRequested) {
+ return Task();
+ }
+
+ Task task = _tasks.front();
+ _tasks.pop_front();
+ return task;
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h
new file mode 100644
index 00000000000..fb7985df7ca
--- /dev/null
+++ b/src/mongo/db/repl/task_runner.h
@@ -0,0 +1,170 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+#include <list>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+
+ class OperationContext;
+ class Status;
+
+namespace threadpool {
+
+ class ThreadPool;
+
+} // namespace threadpool
+
+namespace repl {
+
+ class TaskRunner {
+ MONGO_DISALLOW_COPYING(TaskRunner);
+ public:
+
+ /**
+ * Represents next steps of task runner.
+ */
+ enum class NextAction {
+ kInvalid=0,
+ kDisposeOperationContext=1,
+ kKeepOperationContext=2,
+ kCancel=3,
+ };
+
+ using CreateOperationContextFn = stdx::function<OperationContext*()>;
+ using Task = stdx::function<NextAction (OperationContext*, const Status&)>;
+
+ /**
+ * Creates a Task returning kCancel. This is useful in shutting down the task runner after
+ * running a series of tasks.
+ *
+ * Without a cancellation task, the client would need to coordinate the completion of the
+ * last task with calling cancel() on the task runner.
+ */
+ static Task makeCancelTask();
+
+ TaskRunner(threadpool::ThreadPool* threadPool,
+ const CreateOperationContextFn& createOperationContext);
+
+ virtual ~TaskRunner();
+
+ /**
+ * Returns diagnostic information.
+ */
+ std::string getDiagnosticString() const;
+
+ /**
+ * Returns true if there are any scheduled or actively running tasks.
+ */
+ bool isActive() const;
+
+ /**
+ * Schedules a task to be run by the task runner. Tasks are run in the same order that they
+ * are scheduled.
+ *
+ * This transitions the task runner to an active state.
+ *
+ * The task runner creates an operation context using '_createOperationContext'
+ * prior to running a scheduled task. Depending on the NextAction returned from the
+ * task, operation contexts may be shared between consecutive tasks invoked by the task
+ * runner.
+ *
+ * On completion, each task is expected to return a NextAction to the task runner.
+ *
+ * If the task returns kDisposeOperationContext, the task runner destroys the operation
+ * context. The next task to be invoked will receive a new operation context.
+ *
+ * If the task returns kKeepOperationContext, the task runner will retain the operation
+ * context to pass to the next task in the queue.
+ *
+ * If the task returns kCancel, the task runner will destroy the operation context and
+ * cancel the remaining tasks (each task will be invoked with a status containing the
+ * code ErrorCodes::CallbackCanceled). After all the tasks have been canceled, the task
+ * runner will become inactive.
+ *
+ * If the task returns kInvalid, this NextAction will be handled in the same way as
+ * kDisposeOperationContext.
+ *
+ * If the status passed to the task is not OK, the task should not proceed and return
+ * immediately. This is usually the case when the task runner is canceled. Accessing the
+ * operation context in the task will result in undefined behavior.
+ */
+ void schedule(const Task& task);
+
+ /**
+ * If there is a task that is already running, allows the task to run to completion.
+ * Cancels all scheduled tasks that have not been run. Canceled tasks will still be
+ * invoked with a status containing the code ErrorCodes::CallbackCanceled.
+ * After all active tasks have completed and unscheduled tasks have been canceled, the
+ * task runner will go into an inactive state.
+ *
+ * It is a no-op to call cancel() before scheduling any tasks.
+ */
+ void cancel();
+
+ private:
+
+ /**
+ * Runs tasks in a loop.
+ * Loop exits when any of the tasks returns a non-kContinue next action.
+ */
+ void _runTasks();
+ void _finishRunTasks_inlock();
+
+ /**
+ * Waits for next scheduled task to be added to queue.
+ * Returns null task when task runner is stopped.
+ */
+ Task _waitForNextTask();
+
+ threadpool::ThreadPool* _threadPool;
+ CreateOperationContextFn _createOperationContext;
+
+ // Protects member data of this TaskRunner.
+ mutable boost::mutex _mutex;
+
+ boost::condition _condition;
+
+ // _active is true when there are scheduled tasks in the task queue or
+ // when a task is being run by the task runner.
+ bool _active;
+
+ bool _cancelRequested;
+
+ // FIFO queue of scheduled tasks
+ std::list<Task> _tasks;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
new file mode 100644
index 00000000000..38166cf706f
--- /dev/null
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -0,0 +1,339 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <boost/thread/lock_types.hpp>
+#include <vector>
+
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/db/repl/task_runner_test_fixture.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace {
+
+ using namespace mongo;
+ using namespace mongo::repl;
+
+ using Task = TaskRunner::Task;
+
+ TEST_F(TaskRunnerTest, InvalidConstruction) {
+ // Null thread pool.
+ ASSERT_THROWS(TaskRunner(nullptr, []() -> OperationContext* { return nullptr; }),
+ UserException);
+
+ // Null function for creating operation contexts.
+ ASSERT_THROWS(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()),
+ UserException);
+ }
+
+ TEST_F(TaskRunnerTest, GetDiagnosticString) {
+ ASSERT_FALSE(getTaskRunner().getDiagnosticString().empty());
+ }
+
+ TEST_F(TaskRunnerTest, CallbackValues) {
+ boost::mutex mutex;
+ bool called = false;
+ OperationContext* txn = nullptr;
+ Status status = getDefaultStatus();
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ called = true;
+ txn = theTxn;
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(task);
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT(txn);
+ ASSERT_OK(status);
+ }
+
+ TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) {
+ resetTaskRunner(new TaskRunner(&getThreadPool(), []() -> OperationContext* {
+ return nullptr;
+ }));
+ boost::mutex mutex;
+ bool called = false;
+ OperationContextNoop opCtxNoop;
+ OperationContext* txn = &opCtxNoop;
+ Status status = getDefaultStatus();
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ called = true;
+ txn = theTxn;
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(task);
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT_FALSE(txn);
+ ASSERT_OK(status);
+ }
+
+ std::vector<int> _testRunTaskTwice(TaskRunnerTest& test,
+ TaskRunner::NextAction nextAction,
+ stdx::function<void(const Task& task)> schedule) {
+ boost::mutex mutex;
+ int i = 0;
+ OperationContext* txn[2] = {nullptr, nullptr};
+ int txnId[2] = {-100, -100};
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ int j = i++;
+ if (j >= 2) {
+ return TaskRunner::NextAction::kInvalid;
+ }
+ txn[j] = theTxn;
+ txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]);
+ TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel;
+ return result;
+ };
+ schedule(task);
+ ASSERT_TRUE(test.getTaskRunner().isActive());
+ schedule(task);
+ test.getThreadPool().join();
+ ASSERT_FALSE(test.getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_EQUALS(2, i);
+ ASSERT(txn[0]);
+ ASSERT(txn[1]);
+ ASSERT_NOT_LESS_THAN(txnId[0], 0);
+ ASSERT_NOT_LESS_THAN(txnId[1], 0);
+ return {txnId[0], txnId[1]};
+ }
+
+ std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) {
+ auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); };
+ return _testRunTaskTwice(test, nextAction, schedule);
+ }
+
+ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) {
+ std::vector<int> txnId =
+ _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext);
+ ASSERT_NOT_EQUALS(txnId[0], txnId[1]);
+ }
+
+ // Joining thread pool before scheduling first task has no effect.
+ // Joining thread pool before scheduling second task ensures that task runner releases
+ // thread back to pool after disposing of operation context.
+ TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) {
+ auto schedule = [this](const Task& task) {
+ getThreadPool().join();
+ getTaskRunner().schedule(task);
+ };
+ std::vector<int> txnId =
+ _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule);
+ ASSERT_NOT_EQUALS(txnId[0], txnId[1]);
+ }
+
+ TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) {
+ std::vector<int> txnId =
+ _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext);
+ ASSERT_EQUALS(txnId[0], txnId[1]);
+ }
+
+ TEST_F(TaskRunnerTest, SkipSecondTask) {
+ boost::mutex mutex;
+ int i = 0;
+ OperationContext* txn[2] = {nullptr, nullptr};
+ Status status[2] = {getDefaultStatus(), getDefaultStatus()};
+ boost::condition condition;
+ bool schedulingDone = false;
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::unique_lock<boost::mutex> lk(mutex);
+ int j = i++;
+ if (j >= 2) {
+ return TaskRunner::NextAction::kCancel;
+ }
+ txn[j] = theTxn;
+ status[j] = theStatus;
+
+ // Wait for the test code to schedule the second task.
+ while (!schedulingDone) {
+ condition.wait(lk);
+ }
+
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ getTaskRunner().schedule(task);
+ {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ schedulingDone = true;
+ condition.notify_all();
+ }
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_EQUALS(2, i);
+ ASSERT(txn[0]);
+ ASSERT_OK(status[0]);
+ ASSERT_FALSE(txn[1]);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code());
+ }
+
+ TEST_F(TaskRunnerTest, FirstTaskThrowsException) {
+ boost::mutex mutex;
+ int i = 0;
+ OperationContext* txn[2] = {nullptr, nullptr};
+ Status status[2] = {getDefaultStatus(), getDefaultStatus()};
+ boost::condition condition;
+ bool schedulingDone = false;
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::unique_lock<boost::mutex> lk(mutex);
+ int j = i++;
+ if (j >= 2) {
+ return TaskRunner::NextAction::kCancel;
+ }
+ txn[j] = theTxn;
+ status[j] = theStatus;
+
+ // Wait for the test code to schedule the second task.
+ while (!schedulingDone) {
+ condition.wait(lk);
+ }
+
+ // Throwing an exception from the first task should cancel
+ // unscheduled tasks and make the task runner inactive.
+ // When the second (canceled) task throws an exception, it should be ignored.
+ uassert(ErrorCodes::OperationFailed, "task failure", false);
+
+ // not reached.
+ invariant(false);
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ getTaskRunner().schedule(task);
+ {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ schedulingDone = true;
+ condition.notify_all();
+ }
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_EQUALS(2, i);
+ ASSERT(txn[0]);
+ ASSERT_OK(status[0]);
+ ASSERT_FALSE(txn[1]);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code());
+ }
+
+ TEST_F(TaskRunnerTest, Cancel) {
+ boost::mutex mutex;
+ boost::condition condition;
+ Status status = getDefaultStatus();
+ bool taskRunning = false;
+
+ // Running this task causes the task runner to wait for another task that
+ // is never scheduled.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ status = theStatus;
+ taskRunning = true;
+ condition.notify_all();
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+
+ // Calling cancel() before schedule() has no effect.
+ // The task should still be invoked with a successful status.
+ getTaskRunner().cancel();
+
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ {
+ boost::unique_lock<boost::mutex> lk(mutex);
+ while (!taskRunning) {
+ condition.wait(lk);
+ }
+ }
+
+ // It is fine to call cancel() multiple times.
+ getTaskRunner().cancel();
+ getTaskRunner().cancel();
+
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ // This status will not be OK if canceling the task runner
+ // before scheduling the task results in the task being canceled.
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_OK(status);
+ }
+
+ TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) {
+ boost::mutex mutex;
+ boost::condition condition;
+ Status status = getDefaultStatus();
+ bool taskRunning = false;
+
+ // Running this task causes the task runner to wait for another task that
+ // is never scheduled.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ boost::lock_guard<boost::mutex> lk(mutex);
+ status = theStatus;
+ taskRunning = true;
+ condition.notify_all();
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ {
+ boost::unique_lock<boost::mutex> lk(mutex);
+ while (!taskRunning) {
+ condition.wait(lk);
+ }
+ }
+
+ destroyTaskRunner();
+
+ getThreadPool().join();
+
+ // This status will not be OK if canceling the task runner
+ // before scheduling the task results in the task being canceled.
+ boost::lock_guard<boost::mutex> lk(mutex);
+ ASSERT_OK(status);
+ }
+
+} // namespace
diff --git a/src/mongo/db/repl/task_runner_test_fixture.cpp b/src/mongo/db/repl/task_runner_test_fixture.cpp
new file mode 100644
index 00000000000..e5134b21ec3
--- /dev/null
+++ b/src/mongo/db/repl/task_runner_test_fixture.cpp
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/task_runner_test_fixture.h"
+
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace repl {
+
+ using namespace mongo;
+ using namespace mongo::repl;
+
+namespace {
+
+ const int kNumThreads = 3;
+
+ AtomicInt32 _nextId;
+
+ class TaskRunnerOperationContext : public OperationContextNoop {
+ public:
+ TaskRunnerOperationContext() : _id(_nextId.fetchAndAdd(1)) { }
+ int getId() const { return _id; }
+ private:
+ int _id;
+ };
+
+
+} // namespace
+
+ Status TaskRunnerTest::getDefaultStatus() {
+ return Status(ErrorCodes::InternalError, "Not mutated");
+ }
+
+ int TaskRunnerTest::getOperationContextId(OperationContext* txn) {
+ if (!txn) { return -1; }
+ TaskRunnerOperationContext* taskRunnerTxn = dynamic_cast<TaskRunnerOperationContext*>(txn);
+ if (!taskRunnerTxn) { return -2; }
+ return taskRunnerTxn->getId();
+ }
+
+ OperationContext* TaskRunnerTest::createOperationContext() const {
+ return new TaskRunnerOperationContext();
+ }
+
+ TaskRunner& TaskRunnerTest::getTaskRunner() const {
+ ASSERT(_taskRunner.get());
+ return *_taskRunner;
+ }
+
+ threadpool::ThreadPool& TaskRunnerTest::getThreadPool() const {
+ ASSERT(_threadPool.get());
+ return *_threadPool;
+ }
+
+ void TaskRunnerTest::resetTaskRunner(TaskRunner* taskRunner) {
+ _taskRunner.reset(taskRunner);
+ }
+
+ void TaskRunnerTest::destroyTaskRunner() {
+ _taskRunner.reset();
+ }
+
+ void TaskRunnerTest::setUp() {
+ _threadPool.reset(new ThreadPool(kNumThreads, "TaskRunnerTest-"));
+ resetTaskRunner(new TaskRunner(_threadPool.get(),
+ stdx::bind(&TaskRunnerTest::createOperationContext, this)));
+ }
+
+ void TaskRunnerTest::tearDown() {
+ destroyTaskRunner();
+ _threadPool.reset();
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/task_runner_test_fixture.h b/src/mongo/db/repl/task_runner_test_fixture.h
new file mode 100644
index 00000000000..7b5ed6c030d
--- /dev/null
+++ b/src/mongo/db/repl/task_runner_test_fixture.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/base/status.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace threadpool {
+
+ class ThreadPool;
+
+} // namespace threadpool
+
+namespace repl {
+
+ class TaskRunner;
+
+ /**
+ * Test fixture for tests that require a TaskRunner and/or
+ * ThreadPool.
+ */
+ class TaskRunnerTest : public unittest::Test {
+ public:
+ static Status getDefaultStatus();
+
+ /**
+ * Returns ID of mock operation context returned from createOperationContext().
+ * Returns -1 if txn is null.
+ * Returns -2 if txn cannot be converted to a mock operation context containing an ID.
+ */
+ static int getOperationContextId(OperationContext* txn);
+
+ /**
+ * Returns an noop operation context with an embedded numerical ID.
+ */
+ virtual OperationContext* createOperationContext() const;
+
+ threadpool::ThreadPool& getThreadPool() const;
+ TaskRunner& getTaskRunner() const;
+
+ void resetTaskRunner(TaskRunner* taskRunner);
+ void destroyTaskRunner();
+
+ void setUp() override;
+ void tearDown() override;
+
+ private:
+ std::unique_ptr<threadpool::ThreadPool> _threadPool;
+ std::unique_ptr<TaskRunner> _taskRunner;
+ };
+
+} // namespace repl
+} // namespace mongo