summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-03-14 17:55:59 -0400
committerBenety Goh <benety@mongodb.com>2017-03-20 13:36:34 -0400
commit4af23eb406571c1559ac4022c1c2fc27f00ae93f (patch)
treedbbf204972e19ea75f91c14c7e85ef326166dbc7 /src
parent46dc518b4531b21a60675f7b9b1a9e82a50bbc0c (diff)
downloadmongo-4af23eb406571c1559ac4022c1c2fc27f00ae93f.tar.gz
SERVER-28204 added AbstractAsyncComponent
This defines the life cycle for task executor based async components in replication.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript23
-rw-r--r--src/mongo/db/repl/abstract_async_component.cpp220
-rw-r--r--src/mongo/db/repl/abstract_async_component.h211
-rw-r--r--src/mongo/db/repl/abstract_async_component_test.cpp440
4 files changed, 894 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 1096d248fbd..bd4bfdbbfc6 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -57,6 +57,29 @@ env.Library('rslog',
])
env.Library(
+ target='abstract_async_component',
+ source=[
+ 'abstract_async_component.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
+ ],
+)
+
+env.CppUnitTest(
+ target='abstract_async_component_test',
+ source=[
+ 'abstract_async_component_test.cpp',
+ ],
+ LIBDEPS=[
+ 'abstract_async_component',
+ 'task_executor_mock',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ ],
+)
+
+env.Library(
target='storage_interface',
source=[
'storage_interface.cpp',
diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp
new file mode 100644
index 00000000000..dcf49b39a0b
--- /dev/null
+++ b/src/mongo/db/repl/abstract_async_component.cpp
@@ -0,0 +1,220 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/abstract_async_component.h"
+
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace repl {
+
+AbstractAsyncComponent::AbstractAsyncComponent(executor::TaskExecutor* executor,
+ const std::string& componentName)
+ : _executor(executor), _componentName(componentName) {
+ uassert(ErrorCodes::BadValue, "task executor cannot be null", executor);
+}
+
+executor::TaskExecutor* AbstractAsyncComponent::_getExecutor() {
+ return _executor;
+}
+
+bool AbstractAsyncComponent::isActive() noexcept {
+ stdx::lock_guard<stdx::mutex> lock(*_getMutex());
+ return _isActive_inlock();
+}
+
+bool AbstractAsyncComponent::_isActive_inlock() noexcept {
+ return State::kRunning == _state || State::kShuttingDown == _state;
+}
+
+bool AbstractAsyncComponent::_isShuttingDown() noexcept {
+ stdx::lock_guard<stdx::mutex> lock(*_getMutex());
+ return _isShuttingDown_inlock();
+}
+
+bool AbstractAsyncComponent::_isShuttingDown_inlock() noexcept {
+ return State::kShuttingDown == _state;
+}
+
+Status AbstractAsyncComponent::startup() noexcept {
+ stdx::lock_guard<stdx::mutex> lock(*_getMutex());
+ switch (_state) {
+ case State::kPreStart:
+ _state = State::kRunning;
+ break;
+ case State::kRunning:
+ return Status(ErrorCodes::IllegalOperation,
+ str::stream() << _componentName << " already started");
+ case State::kShuttingDown:
+ return Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << _componentName << " shutting down");
+ case State::kComplete:
+ return Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << _componentName << " completed");
+ }
+
+ auto status = _doStartup_inlock();
+
+ if (!status.isOK()) {
+ _state = State::kComplete;
+ return status;
+ }
+
+ return Status::OK();
+}
+
+void AbstractAsyncComponent::shutdown() noexcept {
+ stdx::lock_guard<stdx::mutex> lock(*_getMutex());
+ switch (_state) {
+ case State::kPreStart:
+ // Transition directly from PreStart to Complete if not started yet.
+ _state = State::kComplete;
+ return;
+ case State::kRunning:
+ _state = State::kShuttingDown;
+ break;
+ case State::kShuttingDown:
+ case State::kComplete:
+ // Nothing to do if we are already in ShuttingDown or Complete state.
+ return;
+ }
+
+ _doShutdown_inlock();
+}
+
+void AbstractAsyncComponent::join() noexcept {
+ stdx::unique_lock<stdx::mutex> lk(*_getMutex());
+ _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); });
+}
+
+AbstractAsyncComponent::State AbstractAsyncComponent::getState_forTest() noexcept {
+ stdx::lock_guard<stdx::mutex> lock(*_getMutex());
+ return _state;
+}
+
+void AbstractAsyncComponent::_transitionToComplete() noexcept {
+ stdx::lock_guard<stdx::mutex> lock(*_getMutex());
+ _transitionToComplete_inlock();
+}
+
+void AbstractAsyncComponent::_transitionToComplete_inlock() noexcept {
+ invariant(State::kComplete != _state);
+ _state = State::kComplete;
+ _stateCondition.notify_all();
+}
+
+Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) {
+ return _checkForShutdownAndConvertStatus_inlock(callbackArgs.status, message);
+}
+
+Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock(
+ const Status& status, const std::string& message) {
+
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ str::stream() << message << ": " << _componentName << " is shutting down");
+ }
+
+ if (!status.isOK()) {
+ return Status(status.code(), message + ": " + status.reason());
+ }
+
+ return Status::OK();
+}
+
+Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock(
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name) {
+ invariant(handle);
+ if (_isShuttingDown_inlock()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ str::stream() << "failed to schedule work " << name << ": " << _componentName
+ << " is shutting down");
+ }
+ auto result = _executor->scheduleWork(work);
+ if (!result.isOK()) {
+ return Status(result.getStatus().code(),
+ str::stream() << "failed to schedule work " << name << ": "
+ << result.getStatus().reason());
+ }
+ *handle = result.getValue();
+ return Status::OK();
+}
+
+Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock(
+ Date_t when,
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name) {
+ invariant(handle);
+ if (_isShuttingDown_inlock()) {
+ return Status(
+ ErrorCodes::CallbackCanceled,
+ str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": "
+ << _componentName
+ << " is shutting down");
+ }
+ auto result = _executor->scheduleWorkAt(when, work);
+ if (!result.isOK()) {
+ return Status(
+ result.getStatus().code(),
+ str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": "
+ << result.getStatus().reason());
+ }
+ *handle = result.getValue();
+ return Status::OK();
+}
+
+void AbstractAsyncComponent::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) {
+ if (!handle) {
+ return;
+ }
+ _executor->cancel(handle);
+}
+
+std::ostream& operator<<(std::ostream& os, const AbstractAsyncComponent::State& state) {
+ switch (state) {
+ case AbstractAsyncComponent::State::kPreStart:
+ return os << "PreStart";
+ case AbstractAsyncComponent::State::kRunning:
+ return os << "Running";
+ case AbstractAsyncComponent::State::kShuttingDown:
+ return os << "ShuttingDown";
+ case AbstractAsyncComponent::State::kComplete:
+ return os << "Complete";
+ }
+ MONGO_UNREACHABLE;
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h
new file mode 100644
index 00000000000..56f79890332
--- /dev/null
+++ b/src/mongo/db/repl/abstract_async_component.h
@@ -0,0 +1,211 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <iosfwd>
+#include <string>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+namespace repl {
+
+/**
+ * This class represents an abstract base class for replication components that run asynchronously
+ * using the executor::TaskExecutor framework. It defines the startup/shutdown semantics with the
+ * added guarantee that components can be run at most once.
+ *
+ * The _state variable in this class is protected by the concrete class's mutex (returned by
+ * _getMutex()).
+ */
+class AbstractAsyncComponent {
+ MONGO_DISALLOW_COPYING(AbstractAsyncComponent);
+
+public:
+ AbstractAsyncComponent(executor::TaskExecutor* executor, const std::string& componentName);
+
+ virtual ~AbstractAsyncComponent() = default;
+
+ /**
+ * Returns true if this component is currently running or in the process of shutting down.
+ */
+ bool isActive() noexcept;
+
+ /**
+ * Starts the component. If the transition from PreStart to Running is allowed, this invokes
+ * _doStartup_inlock() defined in the concrete class. If _doStartup_inlock() fails, this
+ * component will transition to Complete and any restarts after this will be disallowed.
+ */
+ Status startup() noexcept;
+
+ /**
+ * Signals this component to begin shutting down. If the transition from Running to ShuttingDown
+ * is allowed, this invokes _doShutdown_inlock() defined in the concrete class.
+ * Transition directly from PreStart to Complete if not started yet.
+ */
+ void shutdown() noexcept;
+
+ /**
+ * Blocks until inactive.
+ */
+ void join() noexcept;
+
+ /**
+ * State transitions:
+ * PreStart --> Running --> ShuttingDown --> Complete
+ * It is possible to skip intermediate states. For example, calling shutdown() when the
+ * component has not started will transition from PreStart directly to Complete.
+ */
+ enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
+
+ /**
+ * Returns current component state.
+ * For testing only.
+ */
+ State getState_forTest() noexcept;
+
+protected:
+ /**
+ * Returns task executor.
+ */
+ executor::TaskExecutor* _getExecutor();
+
+ /**
+ * Returns true if this component is currently running or in the process of shutting down.
+ */
+ bool _isActive_inlock() noexcept;
+
+ /**
+ * Returns true if this component has received a shutdown request ('_state' is ShuttingDown).
+ */
+ bool _isShuttingDown() noexcept;
+ bool _isShuttingDown_inlock() noexcept;
+
+ /**
+ * Transitions this component to complete and notifies any waiters on '_stateCondition'.
+ * May be called at most once.
+ */
+ void _transitionToComplete() noexcept;
+ void _transitionToComplete_inlock() noexcept;
+
+ /**
+ * Checks the given status (or embedded status inside the callback args) and current component
+ * shutdown state. If the given status is not OK or if we are shutting down, returns a new error
+ * status that should be passed to _finishCallback. The reason in the new error status will
+ * include 'message'.
+ * Otherwise, returns Status::OK().
+ */
+ Status _checkForShutdownAndConvertStatus_inlock(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message);
+ Status _checkForShutdownAndConvertStatus_inlock(const Status& status,
+ const std::string& message);
+
+ /**
+ * Schedules work to be run by the task executor.
+ * Saves handle if work was successfully scheduled.
+ * Returns scheduleWork status (without the handle).
+ */
+ Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name);
+ Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when,
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name);
+
+ /**
+ * Cancels task executor callback handle if not null.
+ */
+ void _cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle);
+
+private:
+ /**
+ * Invoked by startup() to run startup procedure after a successful transition from PreStart to
+ * Running.
+ * Invoked at most once by AbstractAsyncComponent.
+ * May not throw exceptions.
+ *
+ * If _doStartup_inlock() fails, startup() will transition this component from Running to
+ * Complete. Subsequent startup() attempts will return an IllegalOperation error.
+ *
+ * If _doStartup_inlock() succeeds, the component stays in Running (or ShuttingDown if
+ * shutdown() is called) until the component has finished its processing (transtion to
+ * Complete).
+ *
+ * It is the responsibility of the implementation to transition the component state to Complete
+ * by calling _transitionToComplete_inlock() once the component has finished its processing.
+ */
+ virtual Status _doStartup_inlock() noexcept = 0;
+
+ /**
+ * Runs shutdown procedure after a successful transition from Running to ShuttingDown.
+ * Invoked at most once by AbstractAsyncComponent.
+ * May not throw exceptions.
+ */
+ virtual void _doShutdown_inlock() noexcept = 0;
+
+ /**
+ * Returns mutex to guard this component's state variable.
+ */
+ virtual stdx::mutex* _getMutex() noexcept = 0;
+
+private:
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (R) Read-only in concurrent operation; no synchronization required.
+ // (S) Self-synchronizing; access in any way from any context.
+ // (M) Reads and writes guarded by mutex returned by _getMutex().
+
+ // Task executor used to schedule tasks and remote commands.
+ executor::TaskExecutor* const _executor; // (R)
+
+ // Component name used in error messages generated by startup().
+ const std::string _componentName; // (R)
+
+ // Current component state. See comments for State enum class for details.
+ // Protected by mutex in concrete class returned in _getMutex().
+ State _state = State::kPreStart; // (M)
+
+ // Used by _transitionToComplete_inlock() to signal changes in '_state'.
+ mutable stdx::condition_variable _stateCondition; // (S)
+};
+
+/**
+ * Insertion operator for AbstractAsyncComponent::State. Formats state for output stream.
+ * For testing only.
+ */
+std::ostream& operator<<(std::ostream& os, const AbstractAsyncComponent::State& state);
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp
new file mode 100644
index 00000000000..6c2e8cd95f6
--- /dev/null
+++ b/src/mongo/db/repl/abstract_async_component_test.cpp
@@ -0,0 +1,440 @@
+/**
+ * Copyright 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/status.h"
+#include "mongo/db/repl/abstract_async_component.h"
+#include "mongo/db/repl/task_executor_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/stdx/mutex.h"
+
+#include "mongo/unittest/unittest.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+
+/**
+ * Mock implementation of AbstractAsyncComponent that supports returning errors from
+ * _doStartup_inlock() and also tracks if this function has ever been called by
+ * AbstractAsyncComponent.
+ */
+class MockAsyncComponent : public AbstractAsyncComponent {
+public:
+ explicit MockAsyncComponent(executor::TaskExecutor* executor);
+
+ /**
+ * Publicly visible versions of _checkForShutdownAndConvertStatus_inlock() for testing.
+ */
+ Status checkForShutdownAndConvertStatus_forTest(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message);
+ Status checkForShutdownAndConvertStatus_forTest(const Status& status,
+ const std::string& message);
+
+ /**
+ * Publicly visible versions of _scheduleWorkAndSaveHandle_inlock() and
+ * _scheduleWorkAtAndSaveHandle_inlock() for testing.
+ */
+ Status scheduleWorkAndSaveHandle_forTest(const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name);
+
+ /**
+ * Publicly visible version of _scheduleWorkAtAndSaveHandle_inlock() for testing.
+ */
+ Status scheduleWorkAtAndSaveHandle_forTest(Date_t when,
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name);
+
+ /**
+ * Publicly visible version of _cancelHandle_inlock() for testing.
+ */
+ void cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle);
+
+private:
+ Status _doStartup_inlock() noexcept override;
+ void _doShutdown_inlock() noexcept override;
+ stdx::mutex* _getMutex() noexcept override;
+
+ // Used by AbstractAsyncComponent to guard start changes.
+ stdx::mutex _mutex;
+
+public:
+ // Returned by _doStartup_inlock(). Override for testing.
+ Status doStartupResult = Status::OK();
+
+ // Set to true when _doStartup_inlock() is called.
+ bool doStartupCalled = false;
+};
+
+MockAsyncComponent::MockAsyncComponent(executor::TaskExecutor* executor)
+ : AbstractAsyncComponent(executor, "mock component") {}
+
+Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) {
+ return _checkForShutdownAndConvertStatus_inlock(callbackArgs, message);
+}
+
+Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(const Status& status,
+ const std::string& message) {
+ return _checkForShutdownAndConvertStatus_inlock(status, message);
+}
+
+Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest(
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _scheduleWorkAndSaveHandle_inlock(work, handle, name);
+}
+
+Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest(
+ Date_t when,
+ const executor::TaskExecutor::CallbackFn& work,
+ executor::TaskExecutor::CallbackHandle* handle,
+ const std::string& name) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _scheduleWorkAtAndSaveHandle_inlock(when, work, handle, name);
+}
+
+void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _cancelHandle_inlock(handle);
+}
+
+Status MockAsyncComponent::_doStartup_inlock() noexcept {
+ doStartupCalled = true;
+ return doStartupResult;
+}
+
+void MockAsyncComponent::_doShutdown_inlock() noexcept {}
+
+stdx::mutex* MockAsyncComponent::_getMutex() noexcept {
+ return &_mutex;
+}
+
+class AbstractAsyncComponentTest : public executor::ThreadPoolExecutorTest {
+private:
+ void setUp() override;
+ void tearDown() override;
+};
+
+void AbstractAsyncComponentTest::setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+
+ launchExecutorThread();
+}
+
+void AbstractAsyncComponentTest::tearDown() {
+ shutdownExecutorThread();
+ joinExecutorThread();
+
+ executor::ThreadPoolExecutorTest::tearDown();
+}
+
+TEST_F(AbstractAsyncComponentTest, ConstructorThrowsUserAssertionOnNullTaskExecutor) {
+ ASSERT_THROWS_CODE_AND_WHAT(MockAsyncComponent(nullptr),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
+}
+
+TEST_F(AbstractAsyncComponentTest, StateTransitionsToRunningAfterSuccessfulStartup) {
+ MockAsyncComponent component(&getExecutor());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ ASSERT_OK(component.startup());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest());
+ ASSERT_TRUE(component.isActive());
+ ASSERT_TRUE(component.doStartupCalled);
+}
+
+TEST_F(AbstractAsyncComponentTest, StartupReturnsIllegalOperationIfAlreadyActive) {
+ MockAsyncComponent component(&getExecutor());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ ASSERT_OK(component.startup());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest());
+ ASSERT_TRUE(component.isActive());
+ ASSERT_TRUE(component.doStartupCalled);
+
+ component.doStartupCalled = false;
+
+ auto status = component.startup();
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, status);
+ ASSERT_EQUALS("mock component already started", status.reason());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest());
+ ASSERT_TRUE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+}
+
+TEST_F(AbstractAsyncComponentTest, StartupReturnsShutdownInProgressIfComponentIsShuttingDown) {
+ MockAsyncComponent component(&getExecutor());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ ASSERT_OK(component.startup());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest());
+ ASSERT_TRUE(component.isActive());
+ ASSERT_TRUE(component.doStartupCalled);
+
+ component.doStartupCalled = false;
+ component.shutdown();
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest());
+ ASSERT_TRUE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ auto status = component.startup();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status);
+ ASSERT_EQUALS("mock component shutting down", status.reason());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest());
+ ASSERT_TRUE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ StartupTransitionsToCompleteAndPassesThroughErrorFromDoStartupInLock) {
+ MockAsyncComponent component(&getExecutor());
+ component.doStartupResult = {ErrorCodes::OperationFailed, "mock component startup failed"};
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ auto status = component.startup();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
+ ASSERT_EQUALS("mock component startup failed", status.reason());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kComplete, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_TRUE(component.doStartupCalled);
+}
+
+TEST_F(AbstractAsyncComponentTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) {
+ MockAsyncComponent component(&getExecutor());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ component.shutdown();
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kComplete, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+
+ auto status = component.startup();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status);
+ ASSERT_EQUALS("mock component completed", status.reason());
+
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kComplete, component.getState_forTest());
+ ASSERT_FALSE(component.isActive());
+ ASSERT_FALSE(component.doStartupCalled);
+}
+
+executor::TaskExecutor::CallbackArgs statusToCallbackArgs(const Status& status) {
+ return executor::TaskExecutor::CallbackArgs(nullptr, {}, status);
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ CheckForShutdownAndConvertStatusReturnsCallbackCanceledIfComponentIsShuttingDown) {
+ MockAsyncComponent component(&getExecutor());
+ // Skipping checks on component state because these are done in
+ // StartupReturnsShutdownInProgressIfComponentIsShuttingDown.
+ ASSERT_OK(component.startup());
+ component.shutdown();
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest());
+
+ auto status = getDetectableErrorStatus();
+ ASSERT_NOT_EQUALS(ErrorCodes::CallbackCanceled, status);
+ ASSERT_EQUALS(
+ ErrorCodes::CallbackCanceled,
+ component.checkForShutdownAndConvertStatus_forTest(statusToCallbackArgs(status), "mytask"));
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled,
+ component.checkForShutdownAndConvertStatus_forTest(status, "mytask"));
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ CheckForShutdownAndConvertStatusPrependsMessageToReasonInNewErrorStatus) {
+ MockAsyncComponent component(&getExecutor());
+
+ auto status = getDetectableErrorStatus();
+ auto newStatus =
+ component.checkForShutdownAndConvertStatus_forTest(statusToCallbackArgs(status), "mytask");
+ ASSERT_EQUALS(status.code(), newStatus.code());
+ ASSERT_EQUALS("mytask: " + status.reason(), newStatus.reason());
+
+ newStatus = component.checkForShutdownAndConvertStatus_forTest(status, "mytask");
+ ASSERT_EQUALS(status.code(), newStatus.code());
+ ASSERT_EQUALS("mytask: " + status.reason(), newStatus.reason());
+}
+
+TEST_F(AbstractAsyncComponentTest, CheckForShutdownAndConvertStatusPassesThroughSuccessfulStatus) {
+ MockAsyncComponent component(&getExecutor());
+
+ auto status = Status::OK();
+ ASSERT_OK(
+ component.checkForShutdownAndConvertStatus_forTest(statusToCallbackArgs(status), "mytask"));
+ ASSERT_OK(component.checkForShutdownAndConvertStatus_forTest(status, "mytask"));
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ ScheduleWorkAndSaveHandleReturnsCallbackCanceledIfComponentIsShuttingDown) {
+ MockAsyncComponent component(&getExecutor());
+ // Skipping checks on component state because these are done in
+ // StartupReturnsShutdownInProgressIfComponentIsShuttingDown.
+ ASSERT_OK(component.startup());
+ component.shutdown();
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest());
+
+ auto callback = [](const executor::TaskExecutor::CallbackArgs&) {};
+ executor::TaskExecutor::CallbackHandle handle;
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled,
+ component.scheduleWorkAndSaveHandle_forTest(callback, &handle, "mytask"));
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ ScheduleWorkAndSaveHandlePassesThroughErrorFromTaskExecutorScheduleWork) {
+ TaskExecutorMock taskExecutorMock(&getExecutor(),
+ [](const executor::RemoteCommandRequest&) { return false; });
+ MockAsyncComponent component(&taskExecutorMock);
+
+ taskExecutorMock.shouldFailScheduleWork = true;
+
+ auto callback = [](const executor::TaskExecutor::CallbackArgs&) {};
+ executor::TaskExecutor::CallbackHandle handle;
+ ASSERT_EQUALS(ErrorCodes::OperationFailed,
+ component.scheduleWorkAndSaveHandle_forTest(callback, &handle, "mytask"));
+}
+
+TEST_F(AbstractAsyncComponentTest, ScheduleWorkAndSaveHandleSchedulesTaskSuccessfully) {
+ auto executor = &getExecutor();
+ MockAsyncComponent component(executor);
+ auto status = getDetectableErrorStatus();
+ auto callback = [&status](const executor::TaskExecutor::CallbackArgs& callbackArgs) {
+ status = callbackArgs.status;
+ };
+ executor::TaskExecutor::CallbackHandle handle;
+ ASSERT_OK(component.scheduleWorkAndSaveHandle_forTest(callback, &handle, "mytask"));
+ ASSERT_TRUE(handle.isValid());
+ executor->wait(handle);
+ ASSERT_OK(status);
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ ScheduleWorkAtAndSaveHandleReturnsCallbackCanceledIfComponentIsShuttingDown) {
+ MockAsyncComponent component(&getExecutor());
+ // Skipping checks on component state because these are done in
+ // StartupReturnsShutdownInProgressIfComponentIsShuttingDown.
+ ASSERT_OK(component.startup());
+ component.shutdown();
+ ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest());
+
+ auto when = getExecutor().now() + Seconds(1);
+ auto callback = [](const executor::TaskExecutor::CallbackArgs&) {};
+ executor::TaskExecutor::CallbackHandle handle;
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled,
+ component.scheduleWorkAtAndSaveHandle_forTest(when, callback, &handle, "mytask"));
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ ScheduleWorkAtAndSaveHandlePassesThroughErrorFromTaskExecutorScheduleWork) {
+ TaskExecutorMock taskExecutorMock(&getExecutor(),
+ [](const executor::RemoteCommandRequest&) { return false; });
+ MockAsyncComponent component(&taskExecutorMock);
+
+ taskExecutorMock.shouldFailScheduleWorkAt = true;
+
+ auto when = getExecutor().now() + Seconds(1);
+ auto callback = [](const executor::TaskExecutor::CallbackArgs&) {};
+ executor::TaskExecutor::CallbackHandle handle;
+ ASSERT_EQUALS(ErrorCodes::OperationFailed,
+ component.scheduleWorkAtAndSaveHandle_forTest(when, callback, &handle, "mytask"));
+}
+
+TEST_F(AbstractAsyncComponentTest, ScheduleWorkAtAndSaveHandleSchedulesTaskSuccessfully) {
+ auto executor = &getExecutor();
+ MockAsyncComponent component(executor);
+ auto when = executor->now() + Seconds(1);
+ auto status = getDetectableErrorStatus();
+ auto callback = [&status](const executor::TaskExecutor::CallbackArgs& callbackArgs) {
+ status = callbackArgs.status;
+ };
+ executor::TaskExecutor::CallbackHandle handle;
+ ASSERT_OK(component.scheduleWorkAtAndSaveHandle_forTest(when, callback, &handle, "mytask"));
+ ASSERT_TRUE(handle.isValid());
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_GREATER_THAN(when, net->now());
+ ASSERT_EQUALS(when, net->runUntil(when));
+ }
+
+ executor->wait(handle);
+ ASSERT_OK(status);
+}
+
+TEST_F(AbstractAsyncComponentTest, CancelHandleDoesNothingOnInvalidHandle) {
+ MockAsyncComponent component(&getExecutor());
+ component.cancelHandle_forTest({});
+}
+
+TEST_F(AbstractAsyncComponentTest,
+ ScheduledTaskShouldFailWithCallbackCanceledIfHandleIsCanceledUsingCancelHandle) {
+ auto executor = &getExecutor();
+ MockAsyncComponent component(executor);
+ auto status = getDetectableErrorStatus();
+ auto callback = [&status](const executor::TaskExecutor::CallbackArgs& callbackArgs) {
+ status = callbackArgs.status;
+ };
+ auto handle =
+ unittest::assertGet(executor->scheduleWorkAt(executor->now() + Seconds(1), callback));
+ component.cancelHandle_forTest(handle);
+ executor->wait(handle);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status);
+}
+
+} // namespace