diff options
author | Benety Goh <benety@mongodb.com> | 2017-03-08 16:09:25 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-03-23 16:18:26 -0400 |
commit | 42d945d2446e0ca0aef570ec090e4d93ede00618 (patch) | |
tree | 2849ac57af6e9b9d72a82baf669b6f30503dfbea /src/mongo | |
parent | c1964e817459a575a64609229a473d57d08aa4d0 (diff) | |
download | mongo-42d945d2446e0ca0aef570ec090e4d93ede00618.tar.gz |
SERVER-28204 added Rollback interface and 3.6 implementation
This is a stub implementation of the new 3.6 rollback algorithm.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/SConscript | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback.h | 60 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl.cpp | 219 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl.h | 246 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl_test.cpp | 284 |
5 files changed, 844 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 8bf2a589003..149320004a6 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -330,6 +330,41 @@ env.CppUnitTest( ) env.Library( + target='rollback_impl', + source=[ + 'rollback_impl.cpp', + ], + LIBDEPS=[ + 'abstract_async_component', + 'optime', + 'repl_coordinator_interface', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + #'$BUILD_DIR/mongo/db/s/sharding', # CYCLE + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/executor/task_executor_interface', + '$BUILD_DIR/mongo/util/net/hostandport', + ], + LIBDEPS_TAGS=[ + # TODO(ADAM, 2017-01-09): See `CYCLE` tags above + 'illegal_cyclic_or_unresolved_dependencies_whitelisted', + ] +) + +env.CppUnitTest( + target='rollback_impl_test', + source=[ + 'rollback_impl_test.cpp', + ], + LIBDEPS=[ + 'oplog_interface_mock', + 'rollback_impl', + 'rollback_test_fixture', + 'task_executor_mock', + '$BUILD_DIR/mongo/base', + ], +) + +env.Library( target='rollback_source_impl', source=[ 'rollback_source_impl.cpp', diff --git a/src/mongo/db/repl/rollback.h b/src/mongo/db/repl/rollback.h new file mode 100644 index 00000000000..6f65ddb9830 --- /dev/null +++ b/src/mongo/db/repl/rollback.h @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/db/repl/optime.h" +#include "mongo/stdx/functional.h" + +namespace mongo { +namespace repl { + +/** + * This class represents the interface BackgroundSync and ReplicationCoordinatorExternalState use to + * interact with the rollback subsystem. + */ +class Rollback { + MONGO_DISALLOW_COPYING(Rollback); + +public: + /** + * Callback function to report results of rollback. On success, the last optime applied will be + * passed in. + */ + using OnCompletionFn = + stdx::function<void(const StatusWith<OpTime>& lastOpTimeApplied) noexcept>; + + Rollback() = default; + + virtual ~Rollback() = default; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp new file mode 100644 index 00000000000..917c378a407 --- /dev/null +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/rollback_impl.h" + +#include <exception> + +#include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/shard_identity_rollback_notifier.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { + +namespace { + +/** + * Creates an operation context using the current Client. + */ +ServiceContext::UniqueOperationContext makeOpCtx() { + return cc().makeOperationContext(); +} + +} // namespace + +RollbackImpl::RollbackImpl(executor::TaskExecutor* executor, + OplogInterface* localOplog, + const HostAndPort& syncSource, + int requiredRollbackId, + ReplicationCoordinator* replicationCoordinator, + StorageInterface* storageInterface, + const OnCompletionFn& onCompletion) + : AbstractAsyncComponent(executor, "rollback"), + _localOplog(localOplog), + _syncSource(syncSource), + _requiredRollbackId(requiredRollbackId), + _replicationCoordinator(replicationCoordinator), + _storageInterface(storageInterface), + _onCompletion(onCompletion) { + // Task executor will be validated by AbstractAsyncComponent's constructor. + invariant(localOplog); + uassert(ErrorCodes::BadValue, "sync source must be valid", !syncSource.empty()); + invariant(replicationCoordinator); + invariant(storageInterface); + invariant(onCompletion); +} + +RollbackImpl::~RollbackImpl() { + shutdown(); + join(); +} + +// static +StatusWith<OpTime> RollbackImpl::readLocalRollbackInfoAndApplyUntilConsistentWithSyncSource( + ReplicationCoordinator* replicationCoordinator, StorageInterface* storageInterface) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + +Status RollbackImpl::_doStartup_inlock() noexcept { + return _scheduleWorkAndSaveHandle_inlock( + stdx::bind(&RollbackImpl::_transitionToRollbackCallback, this, stdx::placeholders::_1), + &_transitionToRollbackHandle, + str::stream() << "_transitionToRollbackCallback"); +} + +void RollbackImpl::_doShutdown_inlock() noexcept { + _cancelHandle_inlock(_transitionToRollbackHandle); +} + +stdx::mutex* RollbackImpl::_getMutex() noexcept { + return &_mutex; +} + +void RollbackImpl::_transitionToRollbackCallback( + const executor::TaskExecutor::CallbackArgs& callbackArgs) { + auto status = Status::OK(); + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + status = _checkForShutdownAndConvertStatus_inlock( + callbackArgs, str::stream() << "error before transition to ROLLBACK"); + } + if (!status.isOK()) { + _finishCallback(nullptr, status); + return; + } + + log() << "Rollback - transition to ROLLBACK"; + auto opCtx = makeOpCtx(); + { + Lock::GlobalWrite globalWrite(opCtx.get()); + + if (!_replicationCoordinator->setFollowerMode(MemberState::RS_ROLLBACK)) { + std::string msg = str::stream() + << "Cannot transition from " << _replicationCoordinator->getMemberState().toString() + << " to " << MemberState(MemberState::RS_ROLLBACK).toString(); + log() << msg; + status = Status(ErrorCodes::NotSecondary, msg); + } + } + if (!status.isOK()) { + _finishCallback(opCtx.get(), status); + return; + } + + // Success! For now.... + _finishCallback(opCtx.get(), OpTime()); +} + +void RollbackImpl::_checkShardIdentityRollback(OperationContext* opCtx) { + invariant(opCtx); + + if (ShardIdentityRollbackNotifier::get(opCtx)->didRollbackHappen()) { + severe() << "shardIdentity document rollback detected. Shutting down to clear " + "in-memory sharding state. Restarting this process should safely return it " + "to a healthy state"; + fassertFailedNoTrace(40407); + } +} + +void RollbackImpl::_transitionFromRollbackToSecondary(OperationContext* opCtx) { + invariant(opCtx); + + Lock::GlobalWrite globalWrite(opCtx); + + // If the current member state is not ROLLBACK, this means that the + // ReplicationCoordinator::setFollowerMode(ROLLBACK) call in _transitionToRollbackCallback() + // failed. In that case, there's nothing to do in this function. + if (MemberState(MemberState::RS_ROLLBACK) != _replicationCoordinator->getMemberState()) { + return; + } + + if (!_replicationCoordinator->setFollowerMode(MemberState::RS_SECONDARY)) { + severe() << "Failed to transition into " << MemberState(MemberState::RS_SECONDARY) + << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK) + << " but found self in " << _replicationCoordinator->getMemberState(); + fassertFailedNoTrace(40408); + } +} + +void RollbackImpl::_tearDown(OperationContext* opCtx) { + invariant(opCtx); + + _checkShardIdentityRollback(opCtx); + _transitionFromRollbackToSecondary(opCtx); +} + +void RollbackImpl::_finishCallback(OperationContext* opCtx, StatusWith<OpTime> lastApplied) { + // After running callback function '_onCompletion', clear '_onCompletion' to release any + // resources that might be held by this function object. + // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case + // there is any logic that's invoked at the function object's destruction that might call into + // this RollbackImpl. 'onCompletion' must be destroyed outside the lock and this should happen + // before we transition the state to Complete. + decltype(_onCompletion) onCompletion; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_onCompletion); + std::swap(_onCompletion, onCompletion); + } + + // If 'opCtx' is null, lazily create OperationContext using makeOpCtx() that will last for the + // duration of this function call. + _tearDown(opCtx ? opCtx : makeOpCtx().get()); + + // Completion callback must be invoked outside mutex. + try { + onCompletion(lastApplied); + } catch (...) { + severe() << "rollback finish callback threw exception: " << redact(exceptionToStatus()); + // This exception handling block should be unreachable because OnCompletionFn is declared + // noexcept. This is purely a defensive mechanism to guard against C++ runtime + // implementations that have less than ideal support for noexcept. + MONGO_UNREACHABLE; + } + + // Destroy the remaining reference to the completion callback before we transition the state to + // Complete so that callers can expect any resources bound to '_onCompletion' to be released + // before RollbackImpl::join() returns. + onCompletion = {}; + + _transitionToComplete(); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h new file mode 100644 index 00000000000..4dc33a7405c --- /dev/null +++ b/src/mongo/db/repl/rollback_impl.h @@ -0,0 +1,246 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/db/repl/abstract_async_component.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/rollback.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { + +class OperationContext; + +namespace repl { + +class OplogInterface; +class ReplicationCoordinator; +class StorageInterface; + +/** + * During steady state replication, it is possible to find the local server in a state where it + * cannot replicate from a sync source. This can happen if the local server has gone offline and + * comes back to find a new primary with an inconsistent set of operations in its oplog from the + * local server. For example: + + * F = node that is on the wrong branch of history + * SS = sync source (usually primary) + * + * F : a b c d e f g + * SS : a b c d h + * + * In the example 'e', 'f', and 'g' are getting rolled back, 'h' is what's getting rolled forward. + * + * This class models the logic necessary to perform this 'rollback' procedure in two phases: + * + * 1) Information needed to perform the rollback is first read from the sync source and stored + * locally. The user-visible server state (all databases except for the 'local' database) is not + * changed during this information gathering phase. + * + * 2) After the first phase is completed, we read the persisted rollback information during the + * second phase and logically undo the local operations to the common point before apply the + * remote operations ('h' in the example above) forward until we reach a consistent point + * (relative to the sync source). + */ +class RollbackImpl : public AbstractAsyncComponent, public Rollback { +public: + /** + * This constructor is used to create a RollbackImpl instance that will run the entire + * rollback algorithm. This is called during steady state replication when we determine that we + * have to roll back after processing the first batch of oplog entries from the sync source. + * + * To resume an interrupted rollback process at startup, the caller has to use the + * ReplicationCoordinator to transition the member state to ROLLBACK and invoke + * readLocalRollbackInfoAndApplyUntilConsistentWithSyncSource() directly. + */ + RollbackImpl(executor::TaskExecutor* executor, + OplogInterface* localOplog, + const HostAndPort& syncSource, + int requiredRollbackId, + ReplicationCoordinator* replicationCoordinator, + StorageInterface* storageInterface, + const OnCompletionFn& onCompletion); + + virtual ~RollbackImpl(); + + /** + * This is a static function that will run the remaining work in the rollback algorithm + * (Phase 2) after persisting (in Phase 1) all the information we need from the sync source. + * This part of the rollback algorithm runs synchronously and may be invoked in one of two + * contexts: + * 1) as part of an rollback process started by an in-progress RollbackImpl; or + * 2) to resume a previously interrupted rollback process at server startup while initializing + * the replication subsystem. In this case, RollbackImpl will transition the member state + * to ROLLBACK before executing the rest of the rollback procedure. + * + * This function does not need to communicate with the sync source. + * + * Returns the last optime applied. + */ + static StatusWith<OpTime> readLocalRollbackInfoAndApplyUntilConsistentWithSyncSource( + ReplicationCoordinator* replicationCoordinator, StorageInterface* storageInterface); + +private: + /** + * Schedules the first task of the rollback algorithm, which is to transition to ROLLBACK. + * Returns ShutdownInProgress if the first task cannot be scheduled because the task executor is + * shutting down. + * + * Called by AbstractAsyncComponent::startup(). + */ + Status _doStartup_inlock() noexcept override; + + /** + * Cancels all outstanding work. + * + * Called by AbstractAsyncComponent::shutdown(). + */ + void _doShutdown_inlock() noexcept override; + + /** + * Returns mutex to guard this component's state variable. + * + * Used by AbstractAyncComponent to protect access to the component state stored in '_state'. + */ + stdx::mutex* _getMutex() noexcept override; + + /** + * Rollback flowchart (Incomplete): + * + * _doStartup_inlock() + * | + * | + * V + * _transitionToRollbackCallback() + * | + * | + * V + * _tearDown() + * | + * | + * V + * _finishCallback() + */ + + /** + * Uses the ReplicationCoordinator to transition the current member state to ROLLBACK. + * If the transition to ROLLBACK fails, this could mean that we have been elected PRIMARY. In + * this case, we invoke the completion function with a NotSecondary error. + * + * This callback is scheduled by _doStartup_inlock(). + */ + void _transitionToRollbackCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs); + + /** + * If we detected that we rolled back the shardIdentity document as part of this rollback + * then we must shut down the server to clear the in-memory ShardingState associated with the + * shardIdentity document. + * + * 'opCtx' cannot be null. + * + * Called by _tearDown(). + */ + void _checkShardIdentityRollback(OperationContext* opCtx); + + /** + * Transitions the current member state from ROLLBACK to SECONDARY. + * This operation must succeed. Otherwise, we will shut down the server. + * + * 'opCtx' cannot be null. + * + * Called by _tearDown(). + */ + void _transitionFromRollbackToSecondary(OperationContext* opCtx); + + /** + * Performs tear down steps before caller is notified of completion status. + * + * 'opCtx' cannot be null. + * + * Called by _finishCallback() before the completion callback '_onCompletion' is invoked. + */ + void _tearDown(OperationContext* opCtx); + + /** + * Invokes completion callback and transitions state to State::kComplete. + * _finishCallback() may require an OperationContext to perform certain tear down functions. + * It will create its own OperationContext unless the caller provides one in 'opCtx'. + * + * Calls _tearDown() to perform tear down steps before invoking completion callback. + * + * Finally, this function transitions the component state to Complete by invoking + * AbstractAsyncComponent::_transitionToComplete(). + */ + void _finishCallback(OperationContext* opCtx, StatusWith<OpTime> lastApplied); + + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access in any way from any context. + // (M) Reads and writes guarded by _mutex. + + // Guards access to member variables. + mutable stdx::mutex _mutex; // (S) + + // This is used to read oplog entries from the local oplog that will be rolled back. + OplogInterface* const _localOplog; // (R) + + // Host and port of the sync source we are rolling back against. + const HostAndPort _syncSource; // (R) + + // This is the current rollback ID on the sync source that we are rolling back against. + // It is an error if the rollback ID on the sync source changes before rollback is complete. + const int _requiredRollbackId; // (R) + + // This is used to read and update global replication settings. This includes: + // - update transition member states; + // - update current applied and durable optimes; and + // - update global rollback ID (that will be returned by the command replSetGetRBID). + ReplicationCoordinator* const _replicationCoordinator; // (R) + + // This is used to read and update the global minValid settings and to access the storage layer. + StorageInterface* const _storageInterface; // (R) + + // This is invoked with the final status of the rollback. If startup() fails, this callback + // is never invoked. The caller gets the last applied optime when the rollback completes + // successfully or an error status. + // '_onCompletion' is cleared on completion (in _finishCallback()) in order to release any + // resources that might be held by the callback function object. + OnCompletionFn _onCompletion; // (M) + + // Handle to currently scheduled _transitionToRollbackCallback() task. + executor::TaskExecutor::CallbackHandle _transitionToRollbackHandle; // (M) +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp new file mode 100644 index 00000000000..7bfbe47e255 --- /dev/null +++ b/src/mongo/db/repl/rollback_impl_test.cpp @@ -0,0 +1,284 @@ +/** + * Copyright 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/rollback_test_fixture.h" + +#include <memory> + +#include "mongo/db/repl/oplog_interface_mock.h" +#include "mongo/db/repl/rollback_impl.h" +#include "mongo/db/repl/task_executor_mock.h" +#include "mongo/db/s/shard_identity_rollback_notifier.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/scopeguard.h" + +namespace { + +using namespace mongo; +using namespace mongo::repl; + +const OplogInterfaceMock::Operations kEmptyMockOperations; + +/** + * Unit test for rollback implementation introduced in 3.6. + */ +class RollbackImplTest : public RollbackTest { +private: + void setUp() override; + void tearDown() override; + +protected: + TaskExecutorMock::ShouldFailRequestFn _shouldFailScheduleRemoteCommandRequest; + std::unique_ptr<TaskExecutorMock> _taskExecutorMock; + Rollback::OnCompletionFn _onCompletion; + StatusWith<OpTime> _onCompletionResult = executor::TaskExecutorTest::getDetectableErrorStatus(); + std::unique_ptr<OplogInterfaceMock> _localOplog; + int _requiredRollbackId; + std::unique_ptr<RollbackImpl> _rollback; +}; + +void RollbackImplTest::setUp() { + RollbackTest::setUp(); + _shouldFailScheduleRemoteCommandRequest = [](const executor::RemoteCommandRequest& request) { + return false; + }; + _taskExecutorMock = stdx::make_unique<TaskExecutorMock>( + &_threadPoolExecutorTest.getExecutor(), + [this](const executor::RemoteCommandRequest& request) { + return _shouldFailScheduleRemoteCommandRequest(request); + }); + _localOplog = stdx::make_unique<OplogInterfaceMock>(kEmptyMockOperations); + HostAndPort syncSource("localhost", 1234); + _requiredRollbackId = 1; + _onCompletionResult = executor::TaskExecutorTest::getDetectableErrorStatus(); + _onCompletion = [this](const StatusWith<OpTime>& result) noexcept { + _onCompletionResult = result; + }; + _rollback = stdx::make_unique<RollbackImpl>( + _taskExecutorMock.get(), + _localOplog.get(), + syncSource, + _requiredRollbackId, + _coordinator, + &_storageInterface, + [this](const StatusWith<OpTime>& result) noexcept { _onCompletion(result); }); +} + +void RollbackImplTest::tearDown() { + _threadPoolExecutorTest.shutdownExecutorThread(); + _threadPoolExecutorTest.joinExecutorThread(); + + _rollback = {}; + _onCompletionResult = executor::TaskExecutorTest::getDetectableErrorStatus(); + _onCompletion = {}; + _requiredRollbackId = -1; + _localOplog = {}; + _taskExecutorMock = {}; + _shouldFailScheduleRemoteCommandRequest = {}; + RollbackTest::tearDown(); +} + +TEST_F(RollbackImplTest, TestFixtureSetUpInitializesStorageEngine) { + auto serviceContext = _serviceContextMongoDTest.getServiceContext(); + ASSERT_TRUE(serviceContext); + ASSERT_TRUE(serviceContext->getGlobalStorageEngine()); +} + +TEST_F(RollbackImplTest, TestFixtureSetUpInitializesTaskExecutor) { + auto net = _threadPoolExecutorTest.getNet(); + ASSERT_TRUE(net); + auto&& executor = _threadPoolExecutorTest.getExecutor(); + ASSERT_NOT_EQUALS(Date_t(), executor.now()); +} + +TEST_F(RollbackImplTest, InvalidConstruction) { + auto executor = &_threadPoolExecutorTest.getExecutor(); + OplogInterfaceMock emptyOplog(kEmptyMockOperations); + HostAndPort syncSource("localhost", 1234); + int requiredRollbackId = 1; + auto replicationCoordinator = _coordinator; + auto storageInterface = &_storageInterface; + auto onCompletion = [](const StatusWith<OpTime>&) noexcept {}; + + // Null task executor. + // This check is performed in AbstractAsyncComponent's constructor. + ASSERT_THROWS_CODE_AND_WHAT(RollbackImpl(nullptr, + &emptyOplog, + syncSource, + requiredRollbackId, + replicationCoordinator, + storageInterface, + onCompletion), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); + + // Invalid sync source. + ASSERT_THROWS_CODE_AND_WHAT(RollbackImpl(executor, + &emptyOplog, + HostAndPort(), + requiredRollbackId, + replicationCoordinator, + storageInterface, + onCompletion), + UserException, + ErrorCodes::BadValue, + "sync source must be valid"); +} + +TEST_F(RollbackImplTest, + StartupReturnsOperationFailedIfMockExecutorFailsToScheduleRollbackTransitionCallback) { + _taskExecutorMock->shouldFailScheduleWork = true; + ASSERT_EQUALS(ErrorCodes::OperationFailed, _rollback->startup()); +} + +TEST_F( + RollbackImplTest, + RollbackReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingTransitionToRollbackCallback) { + _taskExecutorMock->shouldDeferScheduleWorkByOneSecond = true; + ASSERT_OK(_rollback->startup()); + _threadPoolExecutorTest.getExecutor().shutdown(); + _rollback->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _onCompletionResult); +} + +TEST_F( + RollbackImplTest, + RollbackReturnsCallbackCanceledIfRollbackIsShutdownAfterSchedulingTransitionToRollbackCallback) { + _taskExecutorMock->shouldDeferScheduleWorkByOneSecond = true; + ASSERT_OK(_rollback->startup()); + _rollback->shutdown(); + _rollback->join(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _onCompletionResult); +} + +DEATH_TEST_F(RollbackImplTest, RollbackTerminatesIfCompletionCallbackThrowsException, "terminate") { + _taskExecutorMock->shouldDeferScheduleWorkByOneSecond = true; + ASSERT_OK(_rollback->startup()); + _onCompletion = [](const StatusWith<OpTime>&) noexcept { + uassertStatusOK({ErrorCodes::InternalError, + "exception from RollbackTerminatesIfCompletionCallbackThrowsException"}); + }; + _rollback->shutdown(); + _rollback->join(); + MONGO_UNREACHABLE; +} + +TEST_F(RollbackImplTest, RollbackReturnsNotSecondaryOnFailingToTransitionToRollback) { + _coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_ROLLBACK; + ASSERT_OK(_rollback->startup()); + _rollback->join(); + ASSERT_EQUALS(ErrorCodes::NotSecondary, _onCompletionResult); +} + +DEATH_TEST_F(RollbackImplTest, + RollbackTriggersFatalAssertionOnDetectingShardIdentityDocumentRollback, + "shardIdentity document rollback detected. Shutting down to clear in-memory sharding " + "state. Restarting this process should safely return it to a healthy state") { + ASSERT_FALSE(ShardIdentityRollbackNotifier::get(_opCtx.get())->didRollbackHappen()); + ShardIdentityRollbackNotifier::get(_opCtx.get())->recordThatRollbackHappened(); + ASSERT_TRUE(ShardIdentityRollbackNotifier::get(_opCtx.get())->didRollbackHappen()); + + ASSERT_OK(_rollback->startup()); + _rollback->join(); +} + +DEATH_TEST_F( + RollbackImplTest, + RollbackTriggersFatalAssertionOnFailingToTransitionFromRollbackToSecondaryDuringTearDownPhase, + "Failed to transition into SECONDARY; expected to be in state ROLLBACK but found self in " + "ROLLBACK") { + _coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_SECONDARY; + ASSERT_OK(_rollback->startup()); + _rollback->join(); +} + +TEST_F(RollbackImplTest, RollbackReturnsLastAppliedOpTimeOnSuccess) { + ASSERT_OK(_rollback->startup()); + _rollback->join(); + ASSERT_EQUALS(OpTime(), unittest::assertGet(_onCompletionResult)); +} + +class SharedCallbackState { + MONGO_DISALLOW_COPYING(SharedCallbackState); + +public: + explicit SharedCallbackState(bool* sharedCallbackStateDestroyed) + : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {} + ~SharedCallbackState() { + *_sharedCallbackStateDestroyed = true; + } + +private: + bool* _sharedCallbackStateDestroyed; +}; + +TEST_F(RollbackImplTest, RollbackResetsOnCompletionCallbackFunctionPointerUponCompletion) { + bool sharedCallbackStateDestroyed = false; + auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed); + decltype(_onCompletionResult) lastApplied = _threadPoolExecutorTest.getDetectableErrorStatus(); + + _rollback = stdx::make_unique<RollbackImpl>( + _taskExecutorMock.get(), + _localOplog.get(), + HostAndPort("localhost", 1234), + _requiredRollbackId, + _coordinator, + &_storageInterface, + [&lastApplied, sharedCallbackData ](const StatusWith<OpTime>& result) noexcept { + lastApplied = result; + }); + ON_BLOCK_EXIT([this]() { _taskExecutorMock->shutdown(); }); + + // Completion callback will be invoked on errors after startup() returns successfully. + // We cause the the rollback process to error out early by failing to transition to rollback. + _coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_ROLLBACK; + + ASSERT_OK(_rollback->startup()); + + // After 'sharedCallbackData' is reset, RollbackImpl will hold the last reference count to + // SharedCallbackState. + sharedCallbackData.reset(); + ASSERT_FALSE(sharedCallbackStateDestroyed); + + _rollback->join(); + ASSERT_EQUALS(ErrorCodes::NotSecondary, lastApplied); + + // RollbackImpl should reset 'RollbackImpl::_onCompletion' after running callback function + // for the last time before becoming inactive. + // This ensures that we release resources associated with 'RollbackImpl::_onCompletion'. + ASSERT_TRUE(sharedCallbackStateDestroyed); +} + +} // namespace |