summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-03-08 16:09:25 -0500
committerBenety Goh <benety@mongodb.com>2017-03-23 16:18:26 -0400
commit42d945d2446e0ca0aef570ec090e4d93ede00618 (patch)
tree2849ac57af6e9b9d72a82baf669b6f30503dfbea /src/mongo/db
parentc1964e817459a575a64609229a473d57d08aa4d0 (diff)
downloadmongo-42d945d2446e0ca0aef570ec090e4d93ede00618.tar.gz
SERVER-28204 added Rollback interface and 3.6 implementation
This is a stub implementation of the new 3.6 rollback algorithm.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript35
-rw-r--r--src/mongo/db/repl/rollback.h60
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp219
-rw-r--r--src/mongo/db/repl/rollback_impl.h246
-rw-r--r--src/mongo/db/repl/rollback_impl_test.cpp284
5 files changed, 844 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 8bf2a589003..149320004a6 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -330,6 +330,41 @@ env.CppUnitTest(
)
env.Library(
+ target='rollback_impl',
+ source=[
+ 'rollback_impl.cpp',
+ ],
+ LIBDEPS=[
+ 'abstract_async_component',
+ 'optime',
+ 'repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ #'$BUILD_DIR/mongo/db/s/sharding', # CYCLE
+ '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
+ '$BUILD_DIR/mongo/util/net/hostandport',
+ ],
+ LIBDEPS_TAGS=[
+ # TODO(ADAM, 2017-01-09): See `CYCLE` tags above
+ 'illegal_cyclic_or_unresolved_dependencies_whitelisted',
+ ]
+)
+
+env.CppUnitTest(
+ target='rollback_impl_test',
+ source=[
+ 'rollback_impl_test.cpp',
+ ],
+ LIBDEPS=[
+ 'oplog_interface_mock',
+ 'rollback_impl',
+ 'rollback_test_fixture',
+ 'task_executor_mock',
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.Library(
target='rollback_source_impl',
source=[
'rollback_source_impl.cpp',
diff --git a/src/mongo/db/repl/rollback.h b/src/mongo/db/repl/rollback.h
new file mode 100644
index 00000000000..6f65ddb9830
--- /dev/null
+++ b/src/mongo/db/repl/rollback.h
@@ -0,0 +1,60 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+namespace repl {
+
+/**
+ * This class represents the interface BackgroundSync and ReplicationCoordinatorExternalState use to
+ * interact with the rollback subsystem.
+ */
+class Rollback {
+ MONGO_DISALLOW_COPYING(Rollback);
+
+public:
+ /**
+ * Callback function to report results of rollback. On success, the last optime applied will be
+ * passed in.
+ */
+ using OnCompletionFn =
+ stdx::function<void(const StatusWith<OpTime>& lastOpTimeApplied) noexcept>;
+
+ Rollback() = default;
+
+ virtual ~Rollback() = default;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
new file mode 100644
index 00000000000..917c378a407
--- /dev/null
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -0,0 +1,219 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/rollback_impl.h"
+
+#include <exception>
+
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/shard_identity_rollback_notifier.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+/**
+ * Creates an operation context using the current Client.
+ */
+ServiceContext::UniqueOperationContext makeOpCtx() {
+ return cc().makeOperationContext();
+}
+
+} // namespace
+
+RollbackImpl::RollbackImpl(executor::TaskExecutor* executor,
+ OplogInterface* localOplog,
+ const HostAndPort& syncSource,
+ int requiredRollbackId,
+ ReplicationCoordinator* replicationCoordinator,
+ StorageInterface* storageInterface,
+ const OnCompletionFn& onCompletion)
+ : AbstractAsyncComponent(executor, "rollback"),
+ _localOplog(localOplog),
+ _syncSource(syncSource),
+ _requiredRollbackId(requiredRollbackId),
+ _replicationCoordinator(replicationCoordinator),
+ _storageInterface(storageInterface),
+ _onCompletion(onCompletion) {
+ // Task executor will be validated by AbstractAsyncComponent's constructor.
+ invariant(localOplog);
+ uassert(ErrorCodes::BadValue, "sync source must be valid", !syncSource.empty());
+ invariant(replicationCoordinator);
+ invariant(storageInterface);
+ invariant(onCompletion);
+}
+
+RollbackImpl::~RollbackImpl() {
+ shutdown();
+ join();
+}
+
+// static
+StatusWith<OpTime> RollbackImpl::readLocalRollbackInfoAndApplyUntilConsistentWithSyncSource(
+ ReplicationCoordinator* replicationCoordinator, StorageInterface* storageInterface) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
+Status RollbackImpl::_doStartup_inlock() noexcept {
+ return _scheduleWorkAndSaveHandle_inlock(
+ stdx::bind(&RollbackImpl::_transitionToRollbackCallback, this, stdx::placeholders::_1),
+ &_transitionToRollbackHandle,
+ str::stream() << "_transitionToRollbackCallback");
+}
+
+void RollbackImpl::_doShutdown_inlock() noexcept {
+ _cancelHandle_inlock(_transitionToRollbackHandle);
+}
+
+stdx::mutex* RollbackImpl::_getMutex() noexcept {
+ return &_mutex;
+}
+
+void RollbackImpl::_transitionToRollbackCallback(
+ const executor::TaskExecutor::CallbackArgs& callbackArgs) {
+ auto status = Status::OK();
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ status = _checkForShutdownAndConvertStatus_inlock(
+ callbackArgs, str::stream() << "error before transition to ROLLBACK");
+ }
+ if (!status.isOK()) {
+ _finishCallback(nullptr, status);
+ return;
+ }
+
+ log() << "Rollback - transition to ROLLBACK";
+ auto opCtx = makeOpCtx();
+ {
+ Lock::GlobalWrite globalWrite(opCtx.get());
+
+ if (!_replicationCoordinator->setFollowerMode(MemberState::RS_ROLLBACK)) {
+ std::string msg = str::stream()
+ << "Cannot transition from " << _replicationCoordinator->getMemberState().toString()
+ << " to " << MemberState(MemberState::RS_ROLLBACK).toString();
+ log() << msg;
+ status = Status(ErrorCodes::NotSecondary, msg);
+ }
+ }
+ if (!status.isOK()) {
+ _finishCallback(opCtx.get(), status);
+ return;
+ }
+
+ // Success! For now....
+ _finishCallback(opCtx.get(), OpTime());
+}
+
+void RollbackImpl::_checkShardIdentityRollback(OperationContext* opCtx) {
+ invariant(opCtx);
+
+ if (ShardIdentityRollbackNotifier::get(opCtx)->didRollbackHappen()) {
+ severe() << "shardIdentity document rollback detected. Shutting down to clear "
+ "in-memory sharding state. Restarting this process should safely return it "
+ "to a healthy state";
+ fassertFailedNoTrace(40407);
+ }
+}
+
+void RollbackImpl::_transitionFromRollbackToSecondary(OperationContext* opCtx) {
+ invariant(opCtx);
+
+ Lock::GlobalWrite globalWrite(opCtx);
+
+ // If the current member state is not ROLLBACK, this means that the
+ // ReplicationCoordinator::setFollowerMode(ROLLBACK) call in _transitionToRollbackCallback()
+ // failed. In that case, there's nothing to do in this function.
+ if (MemberState(MemberState::RS_ROLLBACK) != _replicationCoordinator->getMemberState()) {
+ return;
+ }
+
+ if (!_replicationCoordinator->setFollowerMode(MemberState::RS_SECONDARY)) {
+ severe() << "Failed to transition into " << MemberState(MemberState::RS_SECONDARY)
+ << "; expected to be in state " << MemberState(MemberState::RS_ROLLBACK)
+ << " but found self in " << _replicationCoordinator->getMemberState();
+ fassertFailedNoTrace(40408);
+ }
+}
+
+void RollbackImpl::_tearDown(OperationContext* opCtx) {
+ invariant(opCtx);
+
+ _checkShardIdentityRollback(opCtx);
+ _transitionFromRollbackToSecondary(opCtx);
+}
+
+void RollbackImpl::_finishCallback(OperationContext* opCtx, StatusWith<OpTime> lastApplied) {
+ // After running callback function '_onCompletion', clear '_onCompletion' to release any
+ // resources that might be held by this function object.
+ // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
+ // there is any logic that's invoked at the function object's destruction that might call into
+ // this RollbackImpl. 'onCompletion' must be destroyed outside the lock and this should happen
+ // before we transition the state to Complete.
+ decltype(_onCompletion) onCompletion;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_onCompletion);
+ std::swap(_onCompletion, onCompletion);
+ }
+
+ // If 'opCtx' is null, lazily create OperationContext using makeOpCtx() that will last for the
+ // duration of this function call.
+ _tearDown(opCtx ? opCtx : makeOpCtx().get());
+
+ // Completion callback must be invoked outside mutex.
+ try {
+ onCompletion(lastApplied);
+ } catch (...) {
+ severe() << "rollback finish callback threw exception: " << redact(exceptionToStatus());
+ // This exception handling block should be unreachable because OnCompletionFn is declared
+ // noexcept. This is purely a defensive mechanism to guard against C++ runtime
+ // implementations that have less than ideal support for noexcept.
+ MONGO_UNREACHABLE;
+ }
+
+ // Destroy the remaining reference to the completion callback before we transition the state to
+ // Complete so that callers can expect any resources bound to '_onCompletion' to be released
+ // before RollbackImpl::join() returns.
+ onCompletion = {};
+
+ _transitionToComplete();
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h
new file mode 100644
index 00000000000..4dc33a7405c
--- /dev/null
+++ b/src/mongo/db/repl/rollback_impl.h
@@ -0,0 +1,246 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status_with.h"
+#include "mongo/db/repl/abstract_async_component.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/rollback.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+
+class OperationContext;
+
+namespace repl {
+
+class OplogInterface;
+class ReplicationCoordinator;
+class StorageInterface;
+
+/**
+ * During steady state replication, it is possible to find the local server in a state where it
+ * cannot replicate from a sync source. This can happen if the local server has gone offline and
+ * comes back to find a new primary with an inconsistent set of operations in its oplog from the
+ * local server. For example:
+
+ * F = node that is on the wrong branch of history
+ * SS = sync source (usually primary)
+ *
+ * F : a b c d e f g
+ * SS : a b c d h
+ *
+ * In the example 'e', 'f', and 'g' are getting rolled back, 'h' is what's getting rolled forward.
+ *
+ * This class models the logic necessary to perform this 'rollback' procedure in two phases:
+ *
+ * 1) Information needed to perform the rollback is first read from the sync source and stored
+ * locally. The user-visible server state (all databases except for the 'local' database) is not
+ * changed during this information gathering phase.
+ *
+ * 2) After the first phase is completed, we read the persisted rollback information during the
+ * second phase and logically undo the local operations to the common point before apply the
+ * remote operations ('h' in the example above) forward until we reach a consistent point
+ * (relative to the sync source).
+ */
+class RollbackImpl : public AbstractAsyncComponent, public Rollback {
+public:
+ /**
+ * This constructor is used to create a RollbackImpl instance that will run the entire
+ * rollback algorithm. This is called during steady state replication when we determine that we
+ * have to roll back after processing the first batch of oplog entries from the sync source.
+ *
+ * To resume an interrupted rollback process at startup, the caller has to use the
+ * ReplicationCoordinator to transition the member state to ROLLBACK and invoke
+ * readLocalRollbackInfoAndApplyUntilConsistentWithSyncSource() directly.
+ */
+ RollbackImpl(executor::TaskExecutor* executor,
+ OplogInterface* localOplog,
+ const HostAndPort& syncSource,
+ int requiredRollbackId,
+ ReplicationCoordinator* replicationCoordinator,
+ StorageInterface* storageInterface,
+ const OnCompletionFn& onCompletion);
+
+ virtual ~RollbackImpl();
+
+ /**
+ * This is a static function that will run the remaining work in the rollback algorithm
+ * (Phase 2) after persisting (in Phase 1) all the information we need from the sync source.
+ * This part of the rollback algorithm runs synchronously and may be invoked in one of two
+ * contexts:
+ * 1) as part of an rollback process started by an in-progress RollbackImpl; or
+ * 2) to resume a previously interrupted rollback process at server startup while initializing
+ * the replication subsystem. In this case, RollbackImpl will transition the member state
+ * to ROLLBACK before executing the rest of the rollback procedure.
+ *
+ * This function does not need to communicate with the sync source.
+ *
+ * Returns the last optime applied.
+ */
+ static StatusWith<OpTime> readLocalRollbackInfoAndApplyUntilConsistentWithSyncSource(
+ ReplicationCoordinator* replicationCoordinator, StorageInterface* storageInterface);
+
+private:
+ /**
+ * Schedules the first task of the rollback algorithm, which is to transition to ROLLBACK.
+ * Returns ShutdownInProgress if the first task cannot be scheduled because the task executor is
+ * shutting down.
+ *
+ * Called by AbstractAsyncComponent::startup().
+ */
+ Status _doStartup_inlock() noexcept override;
+
+ /**
+ * Cancels all outstanding work.
+ *
+ * Called by AbstractAsyncComponent::shutdown().
+ */
+ void _doShutdown_inlock() noexcept override;
+
+ /**
+ * Returns mutex to guard this component's state variable.
+ *
+ * Used by AbstractAyncComponent to protect access to the component state stored in '_state'.
+ */
+ stdx::mutex* _getMutex() noexcept override;
+
+ /**
+ * Rollback flowchart (Incomplete):
+ *
+ * _doStartup_inlock()
+ * |
+ * |
+ * V
+ * _transitionToRollbackCallback()
+ * |
+ * |
+ * V
+ * _tearDown()
+ * |
+ * |
+ * V
+ * _finishCallback()
+ */
+
+ /**
+ * Uses the ReplicationCoordinator to transition the current member state to ROLLBACK.
+ * If the transition to ROLLBACK fails, this could mean that we have been elected PRIMARY. In
+ * this case, we invoke the completion function with a NotSecondary error.
+ *
+ * This callback is scheduled by _doStartup_inlock().
+ */
+ void _transitionToRollbackCallback(const executor::TaskExecutor::CallbackArgs& callbackArgs);
+
+ /**
+ * If we detected that we rolled back the shardIdentity document as part of this rollback
+ * then we must shut down the server to clear the in-memory ShardingState associated with the
+ * shardIdentity document.
+ *
+ * 'opCtx' cannot be null.
+ *
+ * Called by _tearDown().
+ */
+ void _checkShardIdentityRollback(OperationContext* opCtx);
+
+ /**
+ * Transitions the current member state from ROLLBACK to SECONDARY.
+ * This operation must succeed. Otherwise, we will shut down the server.
+ *
+ * 'opCtx' cannot be null.
+ *
+ * Called by _tearDown().
+ */
+ void _transitionFromRollbackToSecondary(OperationContext* opCtx);
+
+ /**
+ * Performs tear down steps before caller is notified of completion status.
+ *
+ * 'opCtx' cannot be null.
+ *
+ * Called by _finishCallback() before the completion callback '_onCompletion' is invoked.
+ */
+ void _tearDown(OperationContext* opCtx);
+
+ /**
+ * Invokes completion callback and transitions state to State::kComplete.
+ * _finishCallback() may require an OperationContext to perform certain tear down functions.
+ * It will create its own OperationContext unless the caller provides one in 'opCtx'.
+ *
+ * Calls _tearDown() to perform tear down steps before invoking completion callback.
+ *
+ * Finally, this function transitions the component state to Complete by invoking
+ * AbstractAsyncComponent::_transitionToComplete().
+ */
+ void _finishCallback(OperationContext* opCtx, StatusWith<OpTime> lastApplied);
+
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (R) Read-only in concurrent operation; no synchronization required.
+ // (S) Self-synchronizing; access in any way from any context.
+ // (M) Reads and writes guarded by _mutex.
+
+ // Guards access to member variables.
+ mutable stdx::mutex _mutex; // (S)
+
+ // This is used to read oplog entries from the local oplog that will be rolled back.
+ OplogInterface* const _localOplog; // (R)
+
+ // Host and port of the sync source we are rolling back against.
+ const HostAndPort _syncSource; // (R)
+
+ // This is the current rollback ID on the sync source that we are rolling back against.
+ // It is an error if the rollback ID on the sync source changes before rollback is complete.
+ const int _requiredRollbackId; // (R)
+
+ // This is used to read and update global replication settings. This includes:
+ // - update transition member states;
+ // - update current applied and durable optimes; and
+ // - update global rollback ID (that will be returned by the command replSetGetRBID).
+ ReplicationCoordinator* const _replicationCoordinator; // (R)
+
+ // This is used to read and update the global minValid settings and to access the storage layer.
+ StorageInterface* const _storageInterface; // (R)
+
+ // This is invoked with the final status of the rollback. If startup() fails, this callback
+ // is never invoked. The caller gets the last applied optime when the rollback completes
+ // successfully or an error status.
+ // '_onCompletion' is cleared on completion (in _finishCallback()) in order to release any
+ // resources that might be held by the callback function object.
+ OnCompletionFn _onCompletion; // (M)
+
+ // Handle to currently scheduled _transitionToRollbackCallback() task.
+ executor::TaskExecutor::CallbackHandle _transitionToRollbackHandle; // (M)
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp
new file mode 100644
index 00000000000..7bfbe47e255
--- /dev/null
+++ b/src/mongo/db/repl/rollback_impl_test.cpp
@@ -0,0 +1,284 @@
+/**
+ * Copyright 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/rollback_test_fixture.h"
+
+#include <memory>
+
+#include "mongo/db/repl/oplog_interface_mock.h"
+#include "mongo/db/repl/rollback_impl.h"
+#include "mongo/db/repl/task_executor_mock.h"
+#include "mongo/db/s/shard_identity_rollback_notifier.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/scopeguard.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+
+const OplogInterfaceMock::Operations kEmptyMockOperations;
+
+/**
+ * Unit test for rollback implementation introduced in 3.6.
+ */
+class RollbackImplTest : public RollbackTest {
+private:
+ void setUp() override;
+ void tearDown() override;
+
+protected:
+ TaskExecutorMock::ShouldFailRequestFn _shouldFailScheduleRemoteCommandRequest;
+ std::unique_ptr<TaskExecutorMock> _taskExecutorMock;
+ Rollback::OnCompletionFn _onCompletion;
+ StatusWith<OpTime> _onCompletionResult = executor::TaskExecutorTest::getDetectableErrorStatus();
+ std::unique_ptr<OplogInterfaceMock> _localOplog;
+ int _requiredRollbackId;
+ std::unique_ptr<RollbackImpl> _rollback;
+};
+
+void RollbackImplTest::setUp() {
+ RollbackTest::setUp();
+ _shouldFailScheduleRemoteCommandRequest = [](const executor::RemoteCommandRequest& request) {
+ return false;
+ };
+ _taskExecutorMock = stdx::make_unique<TaskExecutorMock>(
+ &_threadPoolExecutorTest.getExecutor(),
+ [this](const executor::RemoteCommandRequest& request) {
+ return _shouldFailScheduleRemoteCommandRequest(request);
+ });
+ _localOplog = stdx::make_unique<OplogInterfaceMock>(kEmptyMockOperations);
+ HostAndPort syncSource("localhost", 1234);
+ _requiredRollbackId = 1;
+ _onCompletionResult = executor::TaskExecutorTest::getDetectableErrorStatus();
+ _onCompletion = [this](const StatusWith<OpTime>& result) noexcept {
+ _onCompletionResult = result;
+ };
+ _rollback = stdx::make_unique<RollbackImpl>(
+ _taskExecutorMock.get(),
+ _localOplog.get(),
+ syncSource,
+ _requiredRollbackId,
+ _coordinator,
+ &_storageInterface,
+ [this](const StatusWith<OpTime>& result) noexcept { _onCompletion(result); });
+}
+
+void RollbackImplTest::tearDown() {
+ _threadPoolExecutorTest.shutdownExecutorThread();
+ _threadPoolExecutorTest.joinExecutorThread();
+
+ _rollback = {};
+ _onCompletionResult = executor::TaskExecutorTest::getDetectableErrorStatus();
+ _onCompletion = {};
+ _requiredRollbackId = -1;
+ _localOplog = {};
+ _taskExecutorMock = {};
+ _shouldFailScheduleRemoteCommandRequest = {};
+ RollbackTest::tearDown();
+}
+
+TEST_F(RollbackImplTest, TestFixtureSetUpInitializesStorageEngine) {
+ auto serviceContext = _serviceContextMongoDTest.getServiceContext();
+ ASSERT_TRUE(serviceContext);
+ ASSERT_TRUE(serviceContext->getGlobalStorageEngine());
+}
+
+TEST_F(RollbackImplTest, TestFixtureSetUpInitializesTaskExecutor) {
+ auto net = _threadPoolExecutorTest.getNet();
+ ASSERT_TRUE(net);
+ auto&& executor = _threadPoolExecutorTest.getExecutor();
+ ASSERT_NOT_EQUALS(Date_t(), executor.now());
+}
+
+TEST_F(RollbackImplTest, InvalidConstruction) {
+ auto executor = &_threadPoolExecutorTest.getExecutor();
+ OplogInterfaceMock emptyOplog(kEmptyMockOperations);
+ HostAndPort syncSource("localhost", 1234);
+ int requiredRollbackId = 1;
+ auto replicationCoordinator = _coordinator;
+ auto storageInterface = &_storageInterface;
+ auto onCompletion = [](const StatusWith<OpTime>&) noexcept {};
+
+ // Null task executor.
+ // This check is performed in AbstractAsyncComponent's constructor.
+ ASSERT_THROWS_CODE_AND_WHAT(RollbackImpl(nullptr,
+ &emptyOplog,
+ syncSource,
+ requiredRollbackId,
+ replicationCoordinator,
+ storageInterface,
+ onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
+
+ // Invalid sync source.
+ ASSERT_THROWS_CODE_AND_WHAT(RollbackImpl(executor,
+ &emptyOplog,
+ HostAndPort(),
+ requiredRollbackId,
+ replicationCoordinator,
+ storageInterface,
+ onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "sync source must be valid");
+}
+
+TEST_F(RollbackImplTest,
+ StartupReturnsOperationFailedIfMockExecutorFailsToScheduleRollbackTransitionCallback) {
+ _taskExecutorMock->shouldFailScheduleWork = true;
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _rollback->startup());
+}
+
+TEST_F(
+ RollbackImplTest,
+ RollbackReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingTransitionToRollbackCallback) {
+ _taskExecutorMock->shouldDeferScheduleWorkByOneSecond = true;
+ ASSERT_OK(_rollback->startup());
+ _threadPoolExecutorTest.getExecutor().shutdown();
+ _rollback->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _onCompletionResult);
+}
+
+TEST_F(
+ RollbackImplTest,
+ RollbackReturnsCallbackCanceledIfRollbackIsShutdownAfterSchedulingTransitionToRollbackCallback) {
+ _taskExecutorMock->shouldDeferScheduleWorkByOneSecond = true;
+ ASSERT_OK(_rollback->startup());
+ _rollback->shutdown();
+ _rollback->join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _onCompletionResult);
+}
+
+DEATH_TEST_F(RollbackImplTest, RollbackTerminatesIfCompletionCallbackThrowsException, "terminate") {
+ _taskExecutorMock->shouldDeferScheduleWorkByOneSecond = true;
+ ASSERT_OK(_rollback->startup());
+ _onCompletion = [](const StatusWith<OpTime>&) noexcept {
+ uassertStatusOK({ErrorCodes::InternalError,
+ "exception from RollbackTerminatesIfCompletionCallbackThrowsException"});
+ };
+ _rollback->shutdown();
+ _rollback->join();
+ MONGO_UNREACHABLE;
+}
+
+TEST_F(RollbackImplTest, RollbackReturnsNotSecondaryOnFailingToTransitionToRollback) {
+ _coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_ROLLBACK;
+ ASSERT_OK(_rollback->startup());
+ _rollback->join();
+ ASSERT_EQUALS(ErrorCodes::NotSecondary, _onCompletionResult);
+}
+
+DEATH_TEST_F(RollbackImplTest,
+ RollbackTriggersFatalAssertionOnDetectingShardIdentityDocumentRollback,
+ "shardIdentity document rollback detected. Shutting down to clear in-memory sharding "
+ "state. Restarting this process should safely return it to a healthy state") {
+ ASSERT_FALSE(ShardIdentityRollbackNotifier::get(_opCtx.get())->didRollbackHappen());
+ ShardIdentityRollbackNotifier::get(_opCtx.get())->recordThatRollbackHappened();
+ ASSERT_TRUE(ShardIdentityRollbackNotifier::get(_opCtx.get())->didRollbackHappen());
+
+ ASSERT_OK(_rollback->startup());
+ _rollback->join();
+}
+
+DEATH_TEST_F(
+ RollbackImplTest,
+ RollbackTriggersFatalAssertionOnFailingToTransitionFromRollbackToSecondaryDuringTearDownPhase,
+ "Failed to transition into SECONDARY; expected to be in state ROLLBACK but found self in "
+ "ROLLBACK") {
+ _coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_SECONDARY;
+ ASSERT_OK(_rollback->startup());
+ _rollback->join();
+}
+
+TEST_F(RollbackImplTest, RollbackReturnsLastAppliedOpTimeOnSuccess) {
+ ASSERT_OK(_rollback->startup());
+ _rollback->join();
+ ASSERT_EQUALS(OpTime(), unittest::assertGet(_onCompletionResult));
+}
+
+class SharedCallbackState {
+ MONGO_DISALLOW_COPYING(SharedCallbackState);
+
+public:
+ explicit SharedCallbackState(bool* sharedCallbackStateDestroyed)
+ : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {}
+ ~SharedCallbackState() {
+ *_sharedCallbackStateDestroyed = true;
+ }
+
+private:
+ bool* _sharedCallbackStateDestroyed;
+};
+
+TEST_F(RollbackImplTest, RollbackResetsOnCompletionCallbackFunctionPointerUponCompletion) {
+ bool sharedCallbackStateDestroyed = false;
+ auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
+ decltype(_onCompletionResult) lastApplied = _threadPoolExecutorTest.getDetectableErrorStatus();
+
+ _rollback = stdx::make_unique<RollbackImpl>(
+ _taskExecutorMock.get(),
+ _localOplog.get(),
+ HostAndPort("localhost", 1234),
+ _requiredRollbackId,
+ _coordinator,
+ &_storageInterface,
+ [&lastApplied, sharedCallbackData ](const StatusWith<OpTime>& result) noexcept {
+ lastApplied = result;
+ });
+ ON_BLOCK_EXIT([this]() { _taskExecutorMock->shutdown(); });
+
+ // Completion callback will be invoked on errors after startup() returns successfully.
+ // We cause the the rollback process to error out early by failing to transition to rollback.
+ _coordinator->_failSetFollowerModeOnThisMemberState = MemberState::RS_ROLLBACK;
+
+ ASSERT_OK(_rollback->startup());
+
+ // After 'sharedCallbackData' is reset, RollbackImpl will hold the last reference count to
+ // SharedCallbackState.
+ sharedCallbackData.reset();
+ ASSERT_FALSE(sharedCallbackStateDestroyed);
+
+ _rollback->join();
+ ASSERT_EQUALS(ErrorCodes::NotSecondary, lastApplied);
+
+ // RollbackImpl should reset 'RollbackImpl::_onCompletion' after running callback function
+ // for the last time before becoming inactive.
+ // This ensures that we release resources associated with 'RollbackImpl::_onCompletion'.
+ ASSERT_TRUE(sharedCallbackStateDestroyed);
+}
+
+} // namespace