diff options
-rw-r--r-- | src/mongo/db/repl/SConscript | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/database_task.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/repl/database_task.h | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/database_task_test.cpp | 186 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner.cpp | 210 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner.h | 170 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner_test.cpp | 339 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner_test_fixture.cpp | 107 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner_test_fixture.h | 85 |
9 files changed, 1323 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index a8e96c93f40..430a6da8880 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -393,3 +393,53 @@ env.CppUnitTest( 'base_cloner_test_fixture', ], ) + +env.Library( + target='task_runner', + source=[ + 'task_runner.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/util/concurrency/thread_pool', + ], +) + +env.Library( + target='task_runner_test_fixture', + source=[ + 'task_runner_test_fixture.cpp', + ], + LIBDEPS=[ + 'task_runner', + '$BUILD_DIR/mongo/util/decorable', + ], +) + +env.CppUnitTest( + target='task_runner_test', + source='task_runner_test.cpp', + LIBDEPS=[ + 'task_runner_test_fixture', + ], +) + +env.Library( + target='database_task', + source=[ + 'database_task.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', + ], +) + +env.CppUnitTest( + target='database_task_test', + source='database_task_test.cpp', + LIBDEPS=[ + 'database_task', + 'replmocks', + 'task_runner_test_fixture', + ], +) diff --git a/src/mongo/db/repl/database_task.cpp b/src/mongo/db/repl/database_task.cpp new file mode 100644 index 00000000000..83a6678e93a --- /dev/null +++ b/src/mongo/db/repl/database_task.cpp @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/database_task.h" +#include "mongo/platform/compiler.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace repl { + + // static + DatabaseTask::Task DatabaseTask::makeGlobalExclusiveLockTask(const Task& task) { + invariant(task); + DatabaseTask::Task newTask = [task](OperationContext* txn, const Status& status) { + if (!status.isOK()) { + return task(txn, status); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lock(txn->lockState()); + return task(txn, status); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "globalExclusiveLockTask", "global"); + MONGO_COMPILER_UNREACHABLE; + }; + return newTask; + } + + // static + DatabaseTask::Task DatabaseTask::makeDatabaseLockTask(const Task& task, + const std::string& databaseName, + LockMode mode) { + invariant(task); + DatabaseTask::Task newTask = [=](OperationContext* txn, const Status& status) { + if (!status.isOK()) { + return task(txn, status); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + LockMode permissiveLockMode = isSharedLockMode(mode) ? MODE_IS : MODE_IX; + ScopedTransaction transaction(txn, permissiveLockMode); + Lock::DBLock lock(txn->lockState(), databaseName, mode); + return task(txn, status); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "databaseLockTask", databaseName); + MONGO_COMPILER_UNREACHABLE; + }; + return newTask; + } + + // static + DatabaseTask::Task DatabaseTask::makeCollectionLockTask(const Task& task, + const NamespaceString& nss, + LockMode mode) { + invariant(task); + DatabaseTask::Task newTask = [=](OperationContext* txn, const Status& status) { + if (!status.isOK()) { + return task(txn, status); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + LockMode permissiveLockMode = isSharedLockMode(mode) ? MODE_IS : MODE_IX; + ScopedTransaction transaction(txn, permissiveLockMode); + Lock::DBLock lock(txn->lockState(), nss.db(), permissiveLockMode); + Lock::CollectionLock collectionLock(txn->lockState(), nss.toString(), mode); + return task(txn, status); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "collectionLockTask", nss.toString()); + MONGO_COMPILER_UNREACHABLE; + }; + return newTask; + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/database_task.h b/src/mongo/db/repl/database_task.h new file mode 100644 index 00000000000..5a4f09dfdae --- /dev/null +++ b/src/mongo/db/repl/database_task.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/db/concurrency/lock_manager_defs.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/db/namespace_string.h" + +namespace mongo { + + class OperationContext; + +namespace repl { + + class DatabaseTask{ + private: + DatabaseTask(); + + public: + + using Task = TaskRunner::Task; + + /** + * Creates a task wrapper that runs the target task inside a global exclusive lock. + */ + static Task makeGlobalExclusiveLockTask(const Task& task); + + /** + * Creates a task wrapper that runs the target task inside a database lock. + */ + static Task makeDatabaseLockTask(const Task& task, + const std::string& databaseName, + LockMode mode); + + /** + * Creates a task wrapper that runs the target task inside a collection lock. + * Task acquires database lock before attempting to lock collection. Do not + * use in combination with makeDatabaseLockTask(). + */ + static Task makeCollectionLockTask(const Task& task, + const NamespaceString& nss, + LockMode mode); + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp new file mode 100644 index 00000000000..b9a37c9ca73 --- /dev/null +++ b/src/mongo/db/repl/database_task_test.cpp @@ -0,0 +1,186 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/thread/lock_types.hpp> + +#include "mongo/db/repl/database_task.h" +#include "mongo/db/repl/operation_context_repl_mock.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/db/repl/task_runner_test_fixture.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace { + + using namespace mongo; + using namespace mongo::repl; + + const std::string databaseName = "mydb"; + const std::string collectionName = "mycoll"; + const NamespaceString nss(databaseName, collectionName); + + class DatabaseTaskTest : public TaskRunnerTest { + public: + OperationContext* createOperationContext() const override; + }; + + OperationContext* DatabaseTaskTest::createOperationContext() const { + return new OperationContextReplMock(); + } + + TEST_F(DatabaseTaskTest, TaskRunnerErrorStatus) { + // Should not attempt to acquire lock on error status from task runner. + auto task = [](OperationContext* txn, const Status& status) { + ASSERT_FALSE(txn); + ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); + return TaskRunner::NextAction::kInvalid; + }; + auto testLockTask = [](DatabaseTask::Task task) { + ASSERT_TRUE(TaskRunner::NextAction::kInvalid == + task(nullptr, Status(ErrorCodes::BadValue, ""))); + }; + testLockTask(DatabaseTask::makeGlobalExclusiveLockTask(task)); + testLockTask(DatabaseTask::makeDatabaseLockTask(task, databaseName, MODE_X)); + testLockTask(DatabaseTask::makeCollectionLockTask(task, nss, MODE_X)); + } + + TEST_F(DatabaseTaskTest, RunGlobalExclusiveLockTask) { + boost::mutex mutex; + bool called = false; + OperationContext* txn = nullptr; + bool lockIsW = false; + Status status = getDefaultStatus(); + // Task returning 'void' implies NextAction::NoAction. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + called = true; + txn = theTxn; + lockIsW = txn->lockState()->isW(); + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(DatabaseTask::makeGlobalExclusiveLockTask(task)); + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT(txn); + ASSERT_TRUE(lockIsW); + ASSERT_OK(status); + } + + void _testRunDatabaseLockTask(DatabaseTaskTest& test, LockMode mode) { + boost::mutex mutex; + bool called = false; + OperationContext* txn = nullptr; + bool isDatabaseLockedForMode = false; + Status status = test.getDefaultStatus(); + // Task returning 'void' implies NextAction::NoAction. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + called = true; + txn = theTxn; + isDatabaseLockedForMode = txn->lockState()->isDbLockedForMode(databaseName, mode); + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + test.getTaskRunner().schedule( + DatabaseTask::makeDatabaseLockTask(task, databaseName, mode)); + test.getThreadPool().join(); + ASSERT_FALSE(test.getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT(txn); + ASSERT_TRUE(isDatabaseLockedForMode); + ASSERT_OK(status); + } + + TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeX) { + _testRunDatabaseLockTask(*this, MODE_X); + } + + TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeS) { + _testRunDatabaseLockTask(*this, MODE_S); + } + + TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIX) { + _testRunDatabaseLockTask(*this, MODE_IX); + } + + TEST_F(DatabaseTaskTest, RunDatabaseLockTaskModeIS) { + _testRunDatabaseLockTask(*this, MODE_IS); + } + + void _testRunCollectionLockTask(DatabaseTaskTest& test, LockMode mode) { + boost::mutex mutex; + bool called = false; + OperationContext* txn = nullptr; + bool isCollectionLockedForMode = false; + Status status = test.getDefaultStatus(); + // Task returning 'void' implies NextAction::NoAction. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + called = true; + txn = theTxn; + isCollectionLockedForMode = + txn->lockState()->isCollectionLockedForMode(nss.toString(), mode); + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + test.getTaskRunner().schedule( + DatabaseTask::makeCollectionLockTask(task, nss, mode)); + test.getThreadPool().join(); + ASSERT_FALSE(test.getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT(txn); + ASSERT_TRUE(isCollectionLockedForMode); + ASSERT_OK(status); + } + + TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeX) { + _testRunCollectionLockTask(*this, MODE_X); + } + + TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeS) { + _testRunCollectionLockTask(*this, MODE_S); + } + + TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIX) { + _testRunCollectionLockTask(*this, MODE_IX); + } + + TEST_F(DatabaseTaskTest, RunCollectionLockTaskModeIS) { + _testRunCollectionLockTask(*this, MODE_IS); + } + +} // namespace diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp new file mode 100644 index 00000000000..f1b54c295d0 --- /dev/null +++ b/src/mongo/db/repl/task_runner.cpp @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/task_runner.h" + +#include <boost/thread/locks.hpp> +#include <memory> + +#include "mongo/db/operation_context.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + +namespace { + + /** + * Runs a single task runner task. + * Any exceptions thrown by the task will be logged and converted into a + * next action of kCancel. + */ + TaskRunner::NextAction runSingleTask(const TaskRunner::Task& task, + OperationContext* txn, + const Status& status) { + try { + return task(txn, status); + } + catch (...) { + log() << "Unhandled exception in task runner: " << exceptionToStatus(); + } + return TaskRunner::NextAction::kCancel; + } + +} // namespace + + // static + TaskRunner::Task TaskRunner::makeCancelTask() { + return [](OperationContext* txn, const Status& status) { + return NextAction::kCancel; + }; + } + + TaskRunner::TaskRunner(threadpool::ThreadPool* threadPool, + const CreateOperationContextFn& createOperationContext) + : _threadPool(threadPool), + _createOperationContext(createOperationContext), + _active(false), + _cancelRequested(false) { + + uassert(ErrorCodes::BadValue, "null thread pool", threadPool); + uassert(ErrorCodes::BadValue, "null operation context factory", createOperationContext); + } + + TaskRunner::~TaskRunner() { + try { + boost::unique_lock<boost::mutex> lk(_mutex); + if (!_active) { + return; + } + _cancelRequested = true; + _condition.notify_all(); + while (_active) { + _condition.wait(lk); + } + } + catch (...) { + error() << "unexpected exception destroying task runner: " << exceptionToStatus(); + } + } + + std::string TaskRunner::getDiagnosticString() const { + boost::lock_guard<boost::mutex> lk(_mutex); + str::stream output; + output << "TaskRunner"; + output << " scheduled tasks: " << _tasks.size(); + output << " active: " << _active; + output << " cancel requested: " << _cancelRequested; + return output; + } + + bool TaskRunner::isActive() const { + boost::lock_guard<boost::mutex> lk(_mutex); + return _active; + } + + void TaskRunner::schedule(const Task& task) { + invariant(task); + + boost::lock_guard<boost::mutex> lk(_mutex); + + _tasks.push_back(task); + _condition.notify_all(); + + if (_active) { + return; + } + + _threadPool->schedule(stdx::bind(&TaskRunner::_runTasks, this)); + + _active = true; + _cancelRequested = false; + } + + void TaskRunner::cancel() { + boost::lock_guard<boost::mutex> lk(_mutex); + _cancelRequested = true; + _condition.notify_all(); + } + + void TaskRunner::_runTasks() { + std::unique_ptr<OperationContext> txn; + + while (Task task = _waitForNextTask()) { + if (!txn) { + txn.reset(_createOperationContext()); + } + + NextAction nextAction = runSingleTask(task, txn.get(), Status::OK()); + + if (nextAction != NextAction::kKeepOperationContext) { + txn.reset(); + } + + if (nextAction == NextAction::kCancel) { + break; + } + // Release thread back to pool after disposing if no scheduled tasks in queue. + if (nextAction == NextAction::kDisposeOperationContext || + nextAction == NextAction::kInvalid) { + boost::lock_guard<boost::mutex> lk(_mutex); + if (_tasks.empty()) { + _finishRunTasks_inlock(); + return; + } + } + } + txn.reset(); + + std::list<Task> tasks; + { + boost::lock_guard<boost::mutex> lk(_mutex); + tasks.swap(_tasks); + } + + // Cancel remaining tasks with a CallbackCanceled status. + for (auto task : tasks) { + runSingleTask(task, nullptr, Status(ErrorCodes::CallbackCanceled, + "this task has been canceled by a previously invoked task")); + } + + boost::lock_guard<boost::mutex> lk(_mutex); + _finishRunTasks_inlock(); + } + + void TaskRunner::_finishRunTasks_inlock() { + _active = false; + _cancelRequested = false; + _condition.notify_all(); + } + + TaskRunner::Task TaskRunner::_waitForNextTask() { + boost::unique_lock<boost::mutex> lk(_mutex); + + while (_tasks.empty() && !_cancelRequested) { + _condition.wait(lk); + } + + if (_cancelRequested) { + return Task(); + } + + Task task = _tasks.front(); + _tasks.pop_front(); + return task; + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h new file mode 100644 index 00000000000..fb7985df7ca --- /dev/null +++ b/src/mongo/db/repl/task_runner.h @@ -0,0 +1,170 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/thread/condition.hpp> +#include <boost/thread/mutex.hpp> +#include <list> + +#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/functional.h" + +namespace mongo { + + class OperationContext; + class Status; + +namespace threadpool { + + class ThreadPool; + +} // namespace threadpool + +namespace repl { + + class TaskRunner { + MONGO_DISALLOW_COPYING(TaskRunner); + public: + + /** + * Represents next steps of task runner. + */ + enum class NextAction { + kInvalid=0, + kDisposeOperationContext=1, + kKeepOperationContext=2, + kCancel=3, + }; + + using CreateOperationContextFn = stdx::function<OperationContext*()>; + using Task = stdx::function<NextAction (OperationContext*, const Status&)>; + + /** + * Creates a Task returning kCancel. This is useful in shutting down the task runner after + * running a series of tasks. + * + * Without a cancellation task, the client would need to coordinate the completion of the + * last task with calling cancel() on the task runner. + */ + static Task makeCancelTask(); + + TaskRunner(threadpool::ThreadPool* threadPool, + const CreateOperationContextFn& createOperationContext); + + virtual ~TaskRunner(); + + /** + * Returns diagnostic information. + */ + std::string getDiagnosticString() const; + + /** + * Returns true if there are any scheduled or actively running tasks. + */ + bool isActive() const; + + /** + * Schedules a task to be run by the task runner. Tasks are run in the same order that they + * are scheduled. + * + * This transitions the task runner to an active state. + * + * The task runner creates an operation context using '_createOperationContext' + * prior to running a scheduled task. Depending on the NextAction returned from the + * task, operation contexts may be shared between consecutive tasks invoked by the task + * runner. + * + * On completion, each task is expected to return a NextAction to the task runner. + * + * If the task returns kDisposeOperationContext, the task runner destroys the operation + * context. The next task to be invoked will receive a new operation context. + * + * If the task returns kKeepOperationContext, the task runner will retain the operation + * context to pass to the next task in the queue. + * + * If the task returns kCancel, the task runner will destroy the operation context and + * cancel the remaining tasks (each task will be invoked with a status containing the + * code ErrorCodes::CallbackCanceled). After all the tasks have been canceled, the task + * runner will become inactive. + * + * If the task returns kInvalid, this NextAction will be handled in the same way as + * kDisposeOperationContext. + * + * If the status passed to the task is not OK, the task should not proceed and return + * immediately. This is usually the case when the task runner is canceled. Accessing the + * operation context in the task will result in undefined behavior. + */ + void schedule(const Task& task); + + /** + * If there is a task that is already running, allows the task to run to completion. + * Cancels all scheduled tasks that have not been run. Canceled tasks will still be + * invoked with a status containing the code ErrorCodes::CallbackCanceled. + * After all active tasks have completed and unscheduled tasks have been canceled, the + * task runner will go into an inactive state. + * + * It is a no-op to call cancel() before scheduling any tasks. + */ + void cancel(); + + private: + + /** + * Runs tasks in a loop. + * Loop exits when any of the tasks returns a non-kContinue next action. + */ + void _runTasks(); + void _finishRunTasks_inlock(); + + /** + * Waits for next scheduled task to be added to queue. + * Returns null task when task runner is stopped. + */ + Task _waitForNextTask(); + + threadpool::ThreadPool* _threadPool; + CreateOperationContextFn _createOperationContext; + + // Protects member data of this TaskRunner. + mutable boost::mutex _mutex; + + boost::condition _condition; + + // _active is true when there are scheduled tasks in the task queue or + // when a task is being run by the task runner. + bool _active; + + bool _cancelRequested; + + // FIFO queue of scheduled tasks + std::list<Task> _tasks; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp new file mode 100644 index 00000000000..38166cf706f --- /dev/null +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -0,0 +1,339 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/thread/lock_types.hpp> +#include <vector> + +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/db/repl/task_runner_test_fixture.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace { + + using namespace mongo; + using namespace mongo::repl; + + using Task = TaskRunner::Task; + + TEST_F(TaskRunnerTest, InvalidConstruction) { + // Null thread pool. + ASSERT_THROWS(TaskRunner(nullptr, []() -> OperationContext* { return nullptr; }), + UserException); + + // Null function for creating operation contexts. + ASSERT_THROWS(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()), + UserException); + } + + TEST_F(TaskRunnerTest, GetDiagnosticString) { + ASSERT_FALSE(getTaskRunner().getDiagnosticString().empty()); + } + + TEST_F(TaskRunnerTest, CallbackValues) { + boost::mutex mutex; + bool called = false; + OperationContext* txn = nullptr; + Status status = getDefaultStatus(); + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + called = true; + txn = theTxn; + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(task); + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT(txn); + ASSERT_OK(status); + } + + TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) { + resetTaskRunner(new TaskRunner(&getThreadPool(), []() -> OperationContext* { + return nullptr; + })); + boost::mutex mutex; + bool called = false; + OperationContextNoop opCtxNoop; + OperationContext* txn = &opCtxNoop; + Status status = getDefaultStatus(); + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + called = true; + txn = theTxn; + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(task); + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT_FALSE(txn); + ASSERT_OK(status); + } + + std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, + TaskRunner::NextAction nextAction, + stdx::function<void(const Task& task)> schedule) { + boost::mutex mutex; + int i = 0; + OperationContext* txn[2] = {nullptr, nullptr}; + int txnId[2] = {-100, -100}; + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + int j = i++; + if (j >= 2) { + return TaskRunner::NextAction::kInvalid; + } + txn[j] = theTxn; + txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]); + TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel; + return result; + }; + schedule(task); + ASSERT_TRUE(test.getTaskRunner().isActive()); + schedule(task); + test.getThreadPool().join(); + ASSERT_FALSE(test.getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_EQUALS(2, i); + ASSERT(txn[0]); + ASSERT(txn[1]); + ASSERT_NOT_LESS_THAN(txnId[0], 0); + ASSERT_NOT_LESS_THAN(txnId[1], 0); + return {txnId[0], txnId[1]}; + } + + std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) { + auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); }; + return _testRunTaskTwice(test, nextAction, schedule); + } + + TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { + std::vector<int> txnId = + _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext); + ASSERT_NOT_EQUALS(txnId[0], txnId[1]); + } + + // Joining thread pool before scheduling first task has no effect. + // Joining thread pool before scheduling second task ensures that task runner releases + // thread back to pool after disposing of operation context. + TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) { + auto schedule = [this](const Task& task) { + getThreadPool().join(); + getTaskRunner().schedule(task); + }; + std::vector<int> txnId = + _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule); + ASSERT_NOT_EQUALS(txnId[0], txnId[1]); + } + + TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) { + std::vector<int> txnId = + _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext); + ASSERT_EQUALS(txnId[0], txnId[1]); + } + + TEST_F(TaskRunnerTest, SkipSecondTask) { + boost::mutex mutex; + int i = 0; + OperationContext* txn[2] = {nullptr, nullptr}; + Status status[2] = {getDefaultStatus(), getDefaultStatus()}; + boost::condition condition; + bool schedulingDone = false; + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::unique_lock<boost::mutex> lk(mutex); + int j = i++; + if (j >= 2) { + return TaskRunner::NextAction::kCancel; + } + txn[j] = theTxn; + status[j] = theStatus; + + // Wait for the test code to schedule the second task. + while (!schedulingDone) { + condition.wait(lk); + } + + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + getTaskRunner().schedule(task); + { + boost::lock_guard<boost::mutex> lk(mutex); + schedulingDone = true; + condition.notify_all(); + } + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_EQUALS(2, i); + ASSERT(txn[0]); + ASSERT_OK(status[0]); + ASSERT_FALSE(txn[1]); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code()); + } + + TEST_F(TaskRunnerTest, FirstTaskThrowsException) { + boost::mutex mutex; + int i = 0; + OperationContext* txn[2] = {nullptr, nullptr}; + Status status[2] = {getDefaultStatus(), getDefaultStatus()}; + boost::condition condition; + bool schedulingDone = false; + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::unique_lock<boost::mutex> lk(mutex); + int j = i++; + if (j >= 2) { + return TaskRunner::NextAction::kCancel; + } + txn[j] = theTxn; + status[j] = theStatus; + + // Wait for the test code to schedule the second task. + while (!schedulingDone) { + condition.wait(lk); + } + + // Throwing an exception from the first task should cancel + // unscheduled tasks and make the task runner inactive. + // When the second (canceled) task throws an exception, it should be ignored. + uassert(ErrorCodes::OperationFailed, "task failure", false); + + // not reached. + invariant(false); + return TaskRunner::NextAction::kKeepOperationContext; + }; + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + getTaskRunner().schedule(task); + { + boost::lock_guard<boost::mutex> lk(mutex); + schedulingDone = true; + condition.notify_all(); + } + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_EQUALS(2, i); + ASSERT(txn[0]); + ASSERT_OK(status[0]); + ASSERT_FALSE(txn[1]); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code()); + } + + TEST_F(TaskRunnerTest, Cancel) { + boost::mutex mutex; + boost::condition condition; + Status status = getDefaultStatus(); + bool taskRunning = false; + + // Running this task causes the task runner to wait for another task that + // is never scheduled. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + status = theStatus; + taskRunning = true; + condition.notify_all(); + return TaskRunner::NextAction::kKeepOperationContext; + }; + + // Calling cancel() before schedule() has no effect. + // The task should still be invoked with a successful status. + getTaskRunner().cancel(); + + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + { + boost::unique_lock<boost::mutex> lk(mutex); + while (!taskRunning) { + condition.wait(lk); + } + } + + // It is fine to call cancel() multiple times. + getTaskRunner().cancel(); + getTaskRunner().cancel(); + + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + // This status will not be OK if canceling the task runner + // before scheduling the task results in the task being canceled. + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_OK(status); + } + + TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { + boost::mutex mutex; + boost::condition condition; + Status status = getDefaultStatus(); + bool taskRunning = false; + + // Running this task causes the task runner to wait for another task that + // is never scheduled. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + boost::lock_guard<boost::mutex> lk(mutex); + status = theStatus; + taskRunning = true; + condition.notify_all(); + return TaskRunner::NextAction::kKeepOperationContext; + }; + + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + { + boost::unique_lock<boost::mutex> lk(mutex); + while (!taskRunning) { + condition.wait(lk); + } + } + + destroyTaskRunner(); + + getThreadPool().join(); + + // This status will not be OK if canceling the task runner + // before scheduling the task results in the task being canceled. + boost::lock_guard<boost::mutex> lk(mutex); + ASSERT_OK(status); + } + +} // namespace diff --git a/src/mongo/db/repl/task_runner_test_fixture.cpp b/src/mongo/db/repl/task_runner_test_fixture.cpp new file mode 100644 index 00000000000..e5134b21ec3 --- /dev/null +++ b/src/mongo/db/repl/task_runner_test_fixture.cpp @@ -0,0 +1,107 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/task_runner_test_fixture.h" + +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace repl { + + using namespace mongo; + using namespace mongo::repl; + +namespace { + + const int kNumThreads = 3; + + AtomicInt32 _nextId; + + class TaskRunnerOperationContext : public OperationContextNoop { + public: + TaskRunnerOperationContext() : _id(_nextId.fetchAndAdd(1)) { } + int getId() const { return _id; } + private: + int _id; + }; + + +} // namespace + + Status TaskRunnerTest::getDefaultStatus() { + return Status(ErrorCodes::InternalError, "Not mutated"); + } + + int TaskRunnerTest::getOperationContextId(OperationContext* txn) { + if (!txn) { return -1; } + TaskRunnerOperationContext* taskRunnerTxn = dynamic_cast<TaskRunnerOperationContext*>(txn); + if (!taskRunnerTxn) { return -2; } + return taskRunnerTxn->getId(); + } + + OperationContext* TaskRunnerTest::createOperationContext() const { + return new TaskRunnerOperationContext(); + } + + TaskRunner& TaskRunnerTest::getTaskRunner() const { + ASSERT(_taskRunner.get()); + return *_taskRunner; + } + + threadpool::ThreadPool& TaskRunnerTest::getThreadPool() const { + ASSERT(_threadPool.get()); + return *_threadPool; + } + + void TaskRunnerTest::resetTaskRunner(TaskRunner* taskRunner) { + _taskRunner.reset(taskRunner); + } + + void TaskRunnerTest::destroyTaskRunner() { + _taskRunner.reset(); + } + + void TaskRunnerTest::setUp() { + _threadPool.reset(new ThreadPool(kNumThreads, "TaskRunnerTest-")); + resetTaskRunner(new TaskRunner(_threadPool.get(), + stdx::bind(&TaskRunnerTest::createOperationContext, this))); + } + + void TaskRunnerTest::tearDown() { + destroyTaskRunner(); + _threadPool.reset(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/task_runner_test_fixture.h b/src/mongo/db/repl/task_runner_test_fixture.h new file mode 100644 index 00000000000..7b5ed6c030d --- /dev/null +++ b/src/mongo/db/repl/task_runner_test_fixture.h @@ -0,0 +1,85 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + + class OperationContext; + +namespace threadpool { + + class ThreadPool; + +} // namespace threadpool + +namespace repl { + + class TaskRunner; + + /** + * Test fixture for tests that require a TaskRunner and/or + * ThreadPool. + */ + class TaskRunnerTest : public unittest::Test { + public: + static Status getDefaultStatus(); + + /** + * Returns ID of mock operation context returned from createOperationContext(). + * Returns -1 if txn is null. + * Returns -2 if txn cannot be converted to a mock operation context containing an ID. + */ + static int getOperationContextId(OperationContext* txn); + + /** + * Returns an noop operation context with an embedded numerical ID. + */ + virtual OperationContext* createOperationContext() const; + + threadpool::ThreadPool& getThreadPool() const; + TaskRunner& getTaskRunner() const; + + void resetTaskRunner(TaskRunner* taskRunner); + void destroyTaskRunner(); + + void setUp() override; + void tearDown() override; + + private: + std::unique_ptr<threadpool::ThreadPool> _threadPool; + std::unique_ptr<TaskRunner> _taskRunner; + }; + +} // namespace repl +} // namespace mongo |