diff options
author | Benety Goh <benety@mongodb.com> | 2017-03-14 17:55:59 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-03-20 13:36:34 -0400 |
commit | 4af23eb406571c1559ac4022c1c2fc27f00ae93f (patch) | |
tree | dbbf204972e19ea75f91c14c7e85ef326166dbc7 /src | |
parent | 46dc518b4531b21a60675f7b9b1a9e82a50bbc0c (diff) | |
download | mongo-4af23eb406571c1559ac4022c1c2fc27f00ae93f.tar.gz |
SERVER-28204 added AbstractAsyncComponent
This defines the life cycle for task executor based async components in replication.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_async_component.cpp | 220 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_async_component.h | 211 | ||||
-rw-r--r-- | src/mongo/db/repl/abstract_async_component_test.cpp | 440 |
4 files changed, 894 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 1096d248fbd..bd4bfdbbfc6 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -57,6 +57,29 @@ env.Library('rslog', ]) env.Library( + target='abstract_async_component', + source=[ + 'abstract_async_component.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/executor/task_executor_interface', + ], +) + +env.CppUnitTest( + target='abstract_async_component_test', + source=[ + 'abstract_async_component_test.cpp', + ], + LIBDEPS=[ + 'abstract_async_component', + 'task_executor_mock', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + ], +) + +env.Library( target='storage_interface', source=[ 'storage_interface.cpp', diff --git a/src/mongo/db/repl/abstract_async_component.cpp b/src/mongo/db/repl/abstract_async_component.cpp new file mode 100644 index 00000000000..dcf49b39a0b --- /dev/null +++ b/src/mongo/db/repl/abstract_async_component.cpp @@ -0,0 +1,220 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/abstract_async_component.h" + +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { + +AbstractAsyncComponent::AbstractAsyncComponent(executor::TaskExecutor* executor, + const std::string& componentName) + : _executor(executor), _componentName(componentName) { + uassert(ErrorCodes::BadValue, "task executor cannot be null", executor); +} + +executor::TaskExecutor* AbstractAsyncComponent::_getExecutor() { + return _executor; +} + +bool AbstractAsyncComponent::isActive() noexcept { + stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + return _isActive_inlock(); +} + +bool AbstractAsyncComponent::_isActive_inlock() noexcept { + return State::kRunning == _state || State::kShuttingDown == _state; +} + +bool AbstractAsyncComponent::_isShuttingDown() noexcept { + stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + return _isShuttingDown_inlock(); +} + +bool AbstractAsyncComponent::_isShuttingDown_inlock() noexcept { + return State::kShuttingDown == _state; +} + +Status AbstractAsyncComponent::startup() noexcept { + stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + switch (_state) { + case State::kPreStart: + _state = State::kRunning; + break; + case State::kRunning: + return Status(ErrorCodes::IllegalOperation, + str::stream() << _componentName << " already started"); + case State::kShuttingDown: + return Status(ErrorCodes::ShutdownInProgress, + str::stream() << _componentName << " shutting down"); + case State::kComplete: + return Status(ErrorCodes::ShutdownInProgress, + str::stream() << _componentName << " completed"); + } + + auto status = _doStartup_inlock(); + + if (!status.isOK()) { + _state = State::kComplete; + return status; + } + + return Status::OK(); +} + +void AbstractAsyncComponent::shutdown() noexcept { + stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + switch (_state) { + case State::kPreStart: + // Transition directly from PreStart to Complete if not started yet. + _state = State::kComplete; + return; + case State::kRunning: + _state = State::kShuttingDown; + break; + case State::kShuttingDown: + case State::kComplete: + // Nothing to do if we are already in ShuttingDown or Complete state. + return; + } + + _doShutdown_inlock(); +} + +void AbstractAsyncComponent::join() noexcept { + stdx::unique_lock<stdx::mutex> lk(*_getMutex()); + _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); +} + +AbstractAsyncComponent::State AbstractAsyncComponent::getState_forTest() noexcept { + stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + return _state; +} + +void AbstractAsyncComponent::_transitionToComplete() noexcept { + stdx::lock_guard<stdx::mutex> lock(*_getMutex()); + _transitionToComplete_inlock(); +} + +void AbstractAsyncComponent::_transitionToComplete_inlock() noexcept { + invariant(State::kComplete != _state); + _state = State::kComplete; + _stateCondition.notify_all(); +} + +Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) { + return _checkForShutdownAndConvertStatus_inlock(callbackArgs.status, message); +} + +Status AbstractAsyncComponent::_checkForShutdownAndConvertStatus_inlock( + const Status& status, const std::string& message) { + + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, + str::stream() << message << ": " << _componentName << " is shutting down"); + } + + if (!status.isOK()) { + return Status(status.code(), message + ": " + status.reason()); + } + + return Status::OK(); +} + +Status AbstractAsyncComponent::_scheduleWorkAndSaveHandle_inlock( + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + invariant(handle); + if (_isShuttingDown_inlock()) { + return Status(ErrorCodes::CallbackCanceled, + str::stream() << "failed to schedule work " << name << ": " << _componentName + << " is shutting down"); + } + auto result = _executor->scheduleWork(work); + if (!result.isOK()) { + return Status(result.getStatus().code(), + str::stream() << "failed to schedule work " << name << ": " + << result.getStatus().reason()); + } + *handle = result.getValue(); + return Status::OK(); +} + +Status AbstractAsyncComponent::_scheduleWorkAtAndSaveHandle_inlock( + Date_t when, + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + invariant(handle); + if (_isShuttingDown_inlock()) { + return Status( + ErrorCodes::CallbackCanceled, + str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": " + << _componentName + << " is shutting down"); + } + auto result = _executor->scheduleWorkAt(when, work); + if (!result.isOK()) { + return Status( + result.getStatus().code(), + str::stream() << "failed to schedule work " << name << " at " << when.toString() << ": " + << result.getStatus().reason()); + } + *handle = result.getValue(); + return Status::OK(); +} + +void AbstractAsyncComponent::_cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle) { + if (!handle) { + return; + } + _executor->cancel(handle); +} + +std::ostream& operator<<(std::ostream& os, const AbstractAsyncComponent::State& state) { + switch (state) { + case AbstractAsyncComponent::State::kPreStart: + return os << "PreStart"; + case AbstractAsyncComponent::State::kRunning: + return os << "Running"; + case AbstractAsyncComponent::State::kShuttingDown: + return os << "ShuttingDown"; + case AbstractAsyncComponent::State::kComplete: + return os << "Complete"; + } + MONGO_UNREACHABLE; +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/abstract_async_component.h b/src/mongo/db/repl/abstract_async_component.h new file mode 100644 index 00000000000..56f79890332 --- /dev/null +++ b/src/mongo/db/repl/abstract_async_component.h @@ -0,0 +1,211 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <iosfwd> +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { +namespace repl { + +/** + * This class represents an abstract base class for replication components that run asynchronously + * using the executor::TaskExecutor framework. It defines the startup/shutdown semantics with the + * added guarantee that components can be run at most once. + * + * The _state variable in this class is protected by the concrete class's mutex (returned by + * _getMutex()). + */ +class AbstractAsyncComponent { + MONGO_DISALLOW_COPYING(AbstractAsyncComponent); + +public: + AbstractAsyncComponent(executor::TaskExecutor* executor, const std::string& componentName); + + virtual ~AbstractAsyncComponent() = default; + + /** + * Returns true if this component is currently running or in the process of shutting down. + */ + bool isActive() noexcept; + + /** + * Starts the component. If the transition from PreStart to Running is allowed, this invokes + * _doStartup_inlock() defined in the concrete class. If _doStartup_inlock() fails, this + * component will transition to Complete and any restarts after this will be disallowed. + */ + Status startup() noexcept; + + /** + * Signals this component to begin shutting down. If the transition from Running to ShuttingDown + * is allowed, this invokes _doShutdown_inlock() defined in the concrete class. + * Transition directly from PreStart to Complete if not started yet. + */ + void shutdown() noexcept; + + /** + * Blocks until inactive. + */ + void join() noexcept; + + /** + * State transitions: + * PreStart --> Running --> ShuttingDown --> Complete + * It is possible to skip intermediate states. For example, calling shutdown() when the + * component has not started will transition from PreStart directly to Complete. + */ + enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; + + /** + * Returns current component state. + * For testing only. + */ + State getState_forTest() noexcept; + +protected: + /** + * Returns task executor. + */ + executor::TaskExecutor* _getExecutor(); + + /** + * Returns true if this component is currently running or in the process of shutting down. + */ + bool _isActive_inlock() noexcept; + + /** + * Returns true if this component has received a shutdown request ('_state' is ShuttingDown). + */ + bool _isShuttingDown() noexcept; + bool _isShuttingDown_inlock() noexcept; + + /** + * Transitions this component to complete and notifies any waiters on '_stateCondition'. + * May be called at most once. + */ + void _transitionToComplete() noexcept; + void _transitionToComplete_inlock() noexcept; + + /** + * Checks the given status (or embedded status inside the callback args) and current component + * shutdown state. If the given status is not OK or if we are shutting down, returns a new error + * status that should be passed to _finishCallback. The reason in the new error status will + * include 'message'. + * Otherwise, returns Status::OK(). + */ + Status _checkForShutdownAndConvertStatus_inlock( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message); + Status _checkForShutdownAndConvertStatus_inlock(const Status& status, + const std::string& message); + + /** + * Schedules work to be run by the task executor. + * Saves handle if work was successfully scheduled. + * Returns scheduleWork status (without the handle). + */ + Status _scheduleWorkAndSaveHandle_inlock(const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + Status _scheduleWorkAtAndSaveHandle_inlock(Date_t when, + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + + /** + * Cancels task executor callback handle if not null. + */ + void _cancelHandle_inlock(executor::TaskExecutor::CallbackHandle handle); + +private: + /** + * Invoked by startup() to run startup procedure after a successful transition from PreStart to + * Running. + * Invoked at most once by AbstractAsyncComponent. + * May not throw exceptions. + * + * If _doStartup_inlock() fails, startup() will transition this component from Running to + * Complete. Subsequent startup() attempts will return an IllegalOperation error. + * + * If _doStartup_inlock() succeeds, the component stays in Running (or ShuttingDown if + * shutdown() is called) until the component has finished its processing (transtion to + * Complete). + * + * It is the responsibility of the implementation to transition the component state to Complete + * by calling _transitionToComplete_inlock() once the component has finished its processing. + */ + virtual Status _doStartup_inlock() noexcept = 0; + + /** + * Runs shutdown procedure after a successful transition from Running to ShuttingDown. + * Invoked at most once by AbstractAsyncComponent. + * May not throw exceptions. + */ + virtual void _doShutdown_inlock() noexcept = 0; + + /** + * Returns mutex to guard this component's state variable. + */ + virtual stdx::mutex* _getMutex() noexcept = 0; + +private: + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access in any way from any context. + // (M) Reads and writes guarded by mutex returned by _getMutex(). + + // Task executor used to schedule tasks and remote commands. + executor::TaskExecutor* const _executor; // (R) + + // Component name used in error messages generated by startup(). + const std::string _componentName; // (R) + + // Current component state. See comments for State enum class for details. + // Protected by mutex in concrete class returned in _getMutex(). + State _state = State::kPreStart; // (M) + + // Used by _transitionToComplete_inlock() to signal changes in '_state'. + mutable stdx::condition_variable _stateCondition; // (S) +}; + +/** + * Insertion operator for AbstractAsyncComponent::State. Formats state for output stream. + * For testing only. + */ +std::ostream& operator<<(std::ostream& os, const AbstractAsyncComponent::State& state); + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp new file mode 100644 index 00000000000..6c2e8cd95f6 --- /dev/null +++ b/src/mongo/db/repl/abstract_async_component_test.cpp @@ -0,0 +1,440 @@ +/** + * Copyright 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/status.h" +#include "mongo/db/repl/abstract_async_component.h" +#include "mongo/db/repl/task_executor_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/stdx/mutex.h" + +#include "mongo/unittest/unittest.h" + +namespace { + +using namespace mongo; +using namespace mongo::repl; + +/** + * Mock implementation of AbstractAsyncComponent that supports returning errors from + * _doStartup_inlock() and also tracks if this function has ever been called by + * AbstractAsyncComponent. + */ +class MockAsyncComponent : public AbstractAsyncComponent { +public: + explicit MockAsyncComponent(executor::TaskExecutor* executor); + + /** + * Publicly visible versions of _checkForShutdownAndConvertStatus_inlock() for testing. + */ + Status checkForShutdownAndConvertStatus_forTest( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message); + Status checkForShutdownAndConvertStatus_forTest(const Status& status, + const std::string& message); + + /** + * Publicly visible versions of _scheduleWorkAndSaveHandle_inlock() and + * _scheduleWorkAtAndSaveHandle_inlock() for testing. + */ + Status scheduleWorkAndSaveHandle_forTest(const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + + /** + * Publicly visible version of _scheduleWorkAtAndSaveHandle_inlock() for testing. + */ + Status scheduleWorkAtAndSaveHandle_forTest(Date_t when, + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name); + + /** + * Publicly visible version of _cancelHandle_inlock() for testing. + */ + void cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle); + +private: + Status _doStartup_inlock() noexcept override; + void _doShutdown_inlock() noexcept override; + stdx::mutex* _getMutex() noexcept override; + + // Used by AbstractAsyncComponent to guard start changes. + stdx::mutex _mutex; + +public: + // Returned by _doStartup_inlock(). Override for testing. + Status doStartupResult = Status::OK(); + + // Set to true when _doStartup_inlock() is called. + bool doStartupCalled = false; +}; + +MockAsyncComponent::MockAsyncComponent(executor::TaskExecutor* executor) + : AbstractAsyncComponent(executor, "mock component") {} + +Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest( + const executor::TaskExecutor::CallbackArgs& callbackArgs, const std::string& message) { + return _checkForShutdownAndConvertStatus_inlock(callbackArgs, message); +} + +Status MockAsyncComponent::checkForShutdownAndConvertStatus_forTest(const Status& status, + const std::string& message) { + return _checkForShutdownAndConvertStatus_inlock(status, message); +} + +Status MockAsyncComponent::scheduleWorkAndSaveHandle_forTest( + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _scheduleWorkAndSaveHandle_inlock(work, handle, name); +} + +Status MockAsyncComponent::scheduleWorkAtAndSaveHandle_forTest( + Date_t when, + const executor::TaskExecutor::CallbackFn& work, + executor::TaskExecutor::CallbackHandle* handle, + const std::string& name) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _scheduleWorkAtAndSaveHandle_inlock(when, work, handle, name); +} + +void MockAsyncComponent::cancelHandle_forTest(executor::TaskExecutor::CallbackHandle handle) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _cancelHandle_inlock(handle); +} + +Status MockAsyncComponent::_doStartup_inlock() noexcept { + doStartupCalled = true; + return doStartupResult; +} + +void MockAsyncComponent::_doShutdown_inlock() noexcept {} + +stdx::mutex* MockAsyncComponent::_getMutex() noexcept { + return &_mutex; +} + +class AbstractAsyncComponentTest : public executor::ThreadPoolExecutorTest { +private: + void setUp() override; + void tearDown() override; +}; + +void AbstractAsyncComponentTest::setUp() { + executor::ThreadPoolExecutorTest::setUp(); + + launchExecutorThread(); +} + +void AbstractAsyncComponentTest::tearDown() { + shutdownExecutorThread(); + joinExecutorThread(); + + executor::ThreadPoolExecutorTest::tearDown(); +} + +TEST_F(AbstractAsyncComponentTest, ConstructorThrowsUserAssertionOnNullTaskExecutor) { + ASSERT_THROWS_CODE_AND_WHAT(MockAsyncComponent(nullptr), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); +} + +TEST_F(AbstractAsyncComponentTest, StateTransitionsToRunningAfterSuccessfulStartup) { + MockAsyncComponent component(&getExecutor()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + ASSERT_OK(component.startup()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest()); + ASSERT_TRUE(component.isActive()); + ASSERT_TRUE(component.doStartupCalled); +} + +TEST_F(AbstractAsyncComponentTest, StartupReturnsIllegalOperationIfAlreadyActive) { + MockAsyncComponent component(&getExecutor()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + ASSERT_OK(component.startup()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest()); + ASSERT_TRUE(component.isActive()); + ASSERT_TRUE(component.doStartupCalled); + + component.doStartupCalled = false; + + auto status = component.startup(); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, status); + ASSERT_EQUALS("mock component already started", status.reason()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest()); + ASSERT_TRUE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); +} + +TEST_F(AbstractAsyncComponentTest, StartupReturnsShutdownInProgressIfComponentIsShuttingDown) { + MockAsyncComponent component(&getExecutor()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + ASSERT_OK(component.startup()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kRunning, component.getState_forTest()); + ASSERT_TRUE(component.isActive()); + ASSERT_TRUE(component.doStartupCalled); + + component.doStartupCalled = false; + component.shutdown(); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest()); + ASSERT_TRUE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + auto status = component.startup(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status); + ASSERT_EQUALS("mock component shutting down", status.reason()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest()); + ASSERT_TRUE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); +} + +TEST_F(AbstractAsyncComponentTest, + StartupTransitionsToCompleteAndPassesThroughErrorFromDoStartupInLock) { + MockAsyncComponent component(&getExecutor()); + component.doStartupResult = {ErrorCodes::OperationFailed, "mock component startup failed"}; + + ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + auto status = component.startup(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, status); + ASSERT_EQUALS("mock component startup failed", status.reason()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kComplete, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_TRUE(component.doStartupCalled); +} + +TEST_F(AbstractAsyncComponentTest, ShutdownTransitionsStateToCompleteIfCalledBeforeStartup) { + MockAsyncComponent component(&getExecutor()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kPreStart, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + component.shutdown(); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kComplete, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); + + auto status = component.startup(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status); + ASSERT_EQUALS("mock component completed", status.reason()); + + ASSERT_EQUALS(AbstractAsyncComponent::State::kComplete, component.getState_forTest()); + ASSERT_FALSE(component.isActive()); + ASSERT_FALSE(component.doStartupCalled); +} + +executor::TaskExecutor::CallbackArgs statusToCallbackArgs(const Status& status) { + return executor::TaskExecutor::CallbackArgs(nullptr, {}, status); +} + +TEST_F(AbstractAsyncComponentTest, + CheckForShutdownAndConvertStatusReturnsCallbackCanceledIfComponentIsShuttingDown) { + MockAsyncComponent component(&getExecutor()); + // Skipping checks on component state because these are done in + // StartupReturnsShutdownInProgressIfComponentIsShuttingDown. + ASSERT_OK(component.startup()); + component.shutdown(); + ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest()); + + auto status = getDetectableErrorStatus(); + ASSERT_NOT_EQUALS(ErrorCodes::CallbackCanceled, status); + ASSERT_EQUALS( + ErrorCodes::CallbackCanceled, + component.checkForShutdownAndConvertStatus_forTest(statusToCallbackArgs(status), "mytask")); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, + component.checkForShutdownAndConvertStatus_forTest(status, "mytask")); +} + +TEST_F(AbstractAsyncComponentTest, + CheckForShutdownAndConvertStatusPrependsMessageToReasonInNewErrorStatus) { + MockAsyncComponent component(&getExecutor()); + + auto status = getDetectableErrorStatus(); + auto newStatus = + component.checkForShutdownAndConvertStatus_forTest(statusToCallbackArgs(status), "mytask"); + ASSERT_EQUALS(status.code(), newStatus.code()); + ASSERT_EQUALS("mytask: " + status.reason(), newStatus.reason()); + + newStatus = component.checkForShutdownAndConvertStatus_forTest(status, "mytask"); + ASSERT_EQUALS(status.code(), newStatus.code()); + ASSERT_EQUALS("mytask: " + status.reason(), newStatus.reason()); +} + +TEST_F(AbstractAsyncComponentTest, CheckForShutdownAndConvertStatusPassesThroughSuccessfulStatus) { + MockAsyncComponent component(&getExecutor()); + + auto status = Status::OK(); + ASSERT_OK( + component.checkForShutdownAndConvertStatus_forTest(statusToCallbackArgs(status), "mytask")); + ASSERT_OK(component.checkForShutdownAndConvertStatus_forTest(status, "mytask")); +} + +TEST_F(AbstractAsyncComponentTest, + ScheduleWorkAndSaveHandleReturnsCallbackCanceledIfComponentIsShuttingDown) { + MockAsyncComponent component(&getExecutor()); + // Skipping checks on component state because these are done in + // StartupReturnsShutdownInProgressIfComponentIsShuttingDown. + ASSERT_OK(component.startup()); + component.shutdown(); + ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest()); + + auto callback = [](const executor::TaskExecutor::CallbackArgs&) {}; + executor::TaskExecutor::CallbackHandle handle; + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, + component.scheduleWorkAndSaveHandle_forTest(callback, &handle, "mytask")); +} + +TEST_F(AbstractAsyncComponentTest, + ScheduleWorkAndSaveHandlePassesThroughErrorFromTaskExecutorScheduleWork) { + TaskExecutorMock taskExecutorMock(&getExecutor(), + [](const executor::RemoteCommandRequest&) { return false; }); + MockAsyncComponent component(&taskExecutorMock); + + taskExecutorMock.shouldFailScheduleWork = true; + + auto callback = [](const executor::TaskExecutor::CallbackArgs&) {}; + executor::TaskExecutor::CallbackHandle handle; + ASSERT_EQUALS(ErrorCodes::OperationFailed, + component.scheduleWorkAndSaveHandle_forTest(callback, &handle, "mytask")); +} + +TEST_F(AbstractAsyncComponentTest, ScheduleWorkAndSaveHandleSchedulesTaskSuccessfully) { + auto executor = &getExecutor(); + MockAsyncComponent component(executor); + auto status = getDetectableErrorStatus(); + auto callback = [&status](const executor::TaskExecutor::CallbackArgs& callbackArgs) { + status = callbackArgs.status; + }; + executor::TaskExecutor::CallbackHandle handle; + ASSERT_OK(component.scheduleWorkAndSaveHandle_forTest(callback, &handle, "mytask")); + ASSERT_TRUE(handle.isValid()); + executor->wait(handle); + ASSERT_OK(status); +} + +TEST_F(AbstractAsyncComponentTest, + ScheduleWorkAtAndSaveHandleReturnsCallbackCanceledIfComponentIsShuttingDown) { + MockAsyncComponent component(&getExecutor()); + // Skipping checks on component state because these are done in + // StartupReturnsShutdownInProgressIfComponentIsShuttingDown. + ASSERT_OK(component.startup()); + component.shutdown(); + ASSERT_EQUALS(AbstractAsyncComponent::State::kShuttingDown, component.getState_forTest()); + + auto when = getExecutor().now() + Seconds(1); + auto callback = [](const executor::TaskExecutor::CallbackArgs&) {}; + executor::TaskExecutor::CallbackHandle handle; + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, + component.scheduleWorkAtAndSaveHandle_forTest(when, callback, &handle, "mytask")); +} + +TEST_F(AbstractAsyncComponentTest, + ScheduleWorkAtAndSaveHandlePassesThroughErrorFromTaskExecutorScheduleWork) { + TaskExecutorMock taskExecutorMock(&getExecutor(), + [](const executor::RemoteCommandRequest&) { return false; }); + MockAsyncComponent component(&taskExecutorMock); + + taskExecutorMock.shouldFailScheduleWorkAt = true; + + auto when = getExecutor().now() + Seconds(1); + auto callback = [](const executor::TaskExecutor::CallbackArgs&) {}; + executor::TaskExecutor::CallbackHandle handle; + ASSERT_EQUALS(ErrorCodes::OperationFailed, + component.scheduleWorkAtAndSaveHandle_forTest(when, callback, &handle, "mytask")); +} + +TEST_F(AbstractAsyncComponentTest, ScheduleWorkAtAndSaveHandleSchedulesTaskSuccessfully) { + auto executor = &getExecutor(); + MockAsyncComponent component(executor); + auto when = executor->now() + Seconds(1); + auto status = getDetectableErrorStatus(); + auto callback = [&status](const executor::TaskExecutor::CallbackArgs& callbackArgs) { + status = callbackArgs.status; + }; + executor::TaskExecutor::CallbackHandle handle; + ASSERT_OK(component.scheduleWorkAtAndSaveHandle_forTest(when, callback, &handle, "mytask")); + ASSERT_TRUE(handle.isValid()); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_GREATER_THAN(when, net->now()); + ASSERT_EQUALS(when, net->runUntil(when)); + } + + executor->wait(handle); + ASSERT_OK(status); +} + +TEST_F(AbstractAsyncComponentTest, CancelHandleDoesNothingOnInvalidHandle) { + MockAsyncComponent component(&getExecutor()); + component.cancelHandle_forTest({}); +} + +TEST_F(AbstractAsyncComponentTest, + ScheduledTaskShouldFailWithCallbackCanceledIfHandleIsCanceledUsingCancelHandle) { + auto executor = &getExecutor(); + MockAsyncComponent component(executor); + auto status = getDetectableErrorStatus(); + auto callback = [&status](const executor::TaskExecutor::CallbackArgs& callbackArgs) { + status = callbackArgs.status; + }; + auto handle = + unittest::assertGet(executor->scheduleWorkAt(executor->now() + Seconds(1), callback)); + component.cancelHandle_forTest(handle); + executor->wait(handle); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status); +} + +} // namespace |