summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2016-05-26 17:50:05 -0400
committerJudah Schvimer <judah@mongodb.com>2016-05-26 17:50:05 -0400
commit832d65c6eec6eb4314b2700a18a332185b5baa36 (patch)
treef323f092748598aa783f55705ce04728804567b5
parentb6221d5e2f3e95221d73947bf0fba6772b19e49b (diff)
downloadmongo-832d65c6eec6eb4314b2700a18a332185b5baa36.tar.gz
SERVER-22244 Restart initial sync on sync source rollback
-rw-r--r--src/mongo/db/repl/SConscript23
-rw-r--r--src/mongo/db/repl/data_replicator.cpp12
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp65
-rw-r--r--src/mongo/db/repl/rollback_checker.cpp158
-rw-r--r--src/mongo/db/repl/rollback_checker.h127
-rw-r--r--src/mongo/db/repl/rollback_checker_test.cpp186
6 files changed, 570 insertions, 1 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d83a3609938..99b443db2bf 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -900,6 +900,7 @@ env.Library(
'oplog_fetcher',
'optime',
'reporter',
+ 'rollback_checker',
'storage_interface',
'$BUILD_DIR/mongo/client/fetcher',
],
@@ -920,6 +921,28 @@ env.CppUnitTest(
)
env.Library(
+ target='rollback_checker',
+ source=[
+ 'rollback_checker.cpp',
+ ],
+ LIBDEPS=[
+ 'replication_executor',
+ ],
+)
+
+env.CppUnitTest(
+ target='rollback_checker_test',
+ source=[
+ 'rollback_checker_test.cpp',
+ ],
+ LIBDEPS=[
+ 'replication_executor_test_fixture',
+ 'rollback_checker',
+ '$BUILD_DIR/mongo/unittest/concurrency',
+ ],
+)
+
+env.Library(
target='roll_back_local_operations',
source=[
'roll_back_local_operations.cpp',
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 41e15b19daf..f09013af383 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/rollback_checker.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -649,6 +650,13 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) {
attemptErrorStatus = _ensureGoodSyncSource_inlock();
}
+ RollbackChecker rollbackChecker(_exec, _syncSource);
+ if (attemptErrorStatus.isOK()) {
+ lk.unlock();
+ attemptErrorStatus = rollbackChecker.reset_sync();
+ lk.lock();
+ }
+
if (attemptErrorStatus.isOK()) {
StatusWith<Event> status = _exec->makeEvent();
if (!status.isOK()) {
@@ -700,6 +708,10 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) {
_initialSyncState->dbsCloner.start(); // When the cloner is done applier starts.
invariant(_initialSyncState->finishEvent.isValid());
_exec->waitForEvent(_initialSyncState->finishEvent);
+ if (rollbackChecker.hasHadRollback()) {
+ _initialSyncState->setStatus(Status(ErrorCodes::UnrecoverableRollbackError,
+ "Rollback occurred during initial sync"));
+ }
attemptErrorStatus = _initialSyncState->status;
// Re-lock DataReplicator Internals
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 939989d8aff..647b623e689 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -450,6 +450,8 @@ TEST_F(InitialSyncTest, Complete) {
*/
const std::vector<BSONObj> responses = {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
// get latest oplog ts
fromjson(
str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
@@ -485,6 +487,8 @@ TEST_F(InitialSyncTest, Complete) {
"{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
<< ", op:'i', o:{_id:1, c:1}}]}}"),
// Applier starts ...
+ // check for rollback
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
};
// Initial sync flag should not be set before starting.
@@ -525,6 +529,8 @@ TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) {
};
const std::vector<BSONObj> responses = {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
// get latest oplog ts
fromjson(
str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
@@ -562,6 +568,8 @@ TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) {
"{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
"{_id:1, a:1} "
"]}}"),
+ // check for rollback
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
};
startSync();
setResponses(responses);
@@ -601,6 +609,8 @@ TEST_F(InitialSyncTest, Failpoint) {
TEST_F(InitialSyncTest, FailsOnClone) {
const std::vector<BSONObj> responses = {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
// get latest oplog ts
fromjson(
str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
@@ -613,13 +623,66 @@ TEST_F(InitialSyncTest, FailsOnClone) {
<< ", op:'i', o:{_id:1, a:1}}]}}"),
// Clone Start
// listDatabases
- fromjson("{ok:0}")};
+ fromjson("{ok:0}"),
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ };
startSync();
setResponses(responses);
playResponses(true);
verifySync(ErrorCodes::InitialSyncFailure);
}
+TEST_F(InitialSyncTest, FailOnRollback) {
+ const std::vector<BSONObj> responses = {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ // get latest oplog ts
+ fromjson(
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // oplog fetcher find
+ fromjson(
+ str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // Clone Start
+ // listDatabases
+ fromjson("{ok:1, databases:[{name:'a'}]}"),
+ // listCollections for "a"
+ fromjson(
+ "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}"),
+ // listIndexes:a
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:" << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
+ // find:a
+ fromjson(
+ "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}"),
+ // Clone Done
+ // get latest oplog ts
+ fromjson(
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, c:1}}]}}"),
+ // Applier starts ...
+ // check for rollback
+ fromjson(str::stream() << "{ok: 1, rbid:2}"),
+ };
+
+ startSync();
+ setResponses({responses});
+ playResponses(true);
+ verifySync(ErrorCodes::InitialSyncFailure);
+}
+
+
class TestSyncSourceSelector2 : public SyncSourceSelector {
public:
void clearSyncSourceBlacklist() override {}
diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp
new file mode 100644
index 00000000000..7d1a710982a
--- /dev/null
+++ b/src/mongo/db/repl/rollback_checker.cpp
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/rollback_checker.h"
+
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
+using UniqueLock = stdx::unique_lock<stdx::mutex>;
+
+RollbackChecker::RollbackChecker(executor::TaskExecutor* executor, HostAndPort syncSource)
+ : _executor(executor), _syncSource(syncSource), _baseRBID(-1), _lastRBID(-1) {
+ uassert(ErrorCodes::BadValue, "null task executor", executor);
+}
+
+RollbackChecker::~RollbackChecker() {}
+
+RollbackChecker::CallbackHandle RollbackChecker::checkForRollback(const CallbackFn& nextAction) {
+ return _scheduleGetRollbackId([this, nextAction](const RemoteCommandCallbackArgs& args) {
+ if (args.response.getStatus() == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ if (!args.response.isOK()) {
+ nextAction(args.response.getStatus());
+ return;
+ }
+ if (auto rbidElement = args.response.getValue().data["rbid"]) {
+ int remoteRBID = rbidElement.numberInt();
+
+ UniqueLock lk(_mutex);
+ bool hadRollback = _checkForRollback_inlock(remoteRBID);
+ lk.unlock();
+
+ if (hadRollback) {
+ nextAction(Status(ErrorCodes::UnrecoverableRollbackError,
+ "RollbackChecker detected rollback occurred"));
+ } else {
+ nextAction(Status::OK());
+ }
+ } else {
+ nextAction(Status(ErrorCodes::CommandFailed,
+ "replSetGetRBID command failed when checking for rollback"));
+ }
+ }, nextAction);
+}
+
+bool RollbackChecker::hasHadRollback() {
+ // Default to true in case the callback is not called in an error case.
+ bool hasHadRollback = true;
+ auto cbh = checkForRollback(
+ [&hasHadRollback](const Status& status) { hasHadRollback = !status.isOK(); });
+ _executor->wait(cbh);
+ return hasHadRollback;
+}
+
+RollbackChecker::CallbackHandle RollbackChecker::reset(const CallbackFn& nextAction) {
+ return _scheduleGetRollbackId([this, nextAction](const RemoteCommandCallbackArgs& args) {
+ if (args.response.getStatus() == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ if (!args.response.isOK()) {
+ nextAction(args.response.getStatus());
+ return;
+ }
+ if (auto rbidElement = args.response.getValue().data["rbid"]) {
+ int newRBID = rbidElement.numberInt();
+
+ UniqueLock lk(_mutex);
+ _setRBID_inlock(newRBID);
+ lk.unlock();
+
+ nextAction(Status::OK());
+ } else {
+ nextAction(Status(ErrorCodes::CommandFailed,
+ "replSetGetRBID command failed when checking for rollback"));
+ }
+ }, nextAction);
+}
+
+Status RollbackChecker::reset_sync() {
+ // Default to an error in case the callback is not called in an error case.
+ Status resetStatus = Status(ErrorCodes::CommandFailed, "RollbackChecker reset failed");
+ auto cbh = reset([&resetStatus](const Status& status) { resetStatus = status; });
+ _executor->wait(cbh);
+ return resetStatus;
+}
+
+int RollbackChecker::getBaseRBID_forTest() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _baseRBID;
+}
+
+int RollbackChecker::getLastRBID_forTest() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _lastRBID;
+}
+
+bool RollbackChecker::_checkForRollback_inlock(int remoteRBID) {
+ _lastRBID = remoteRBID;
+ return remoteRBID != _baseRBID;
+}
+
+RollbackChecker::CallbackHandle RollbackChecker::_scheduleGetRollbackId(
+ const RemoteCommandCallbackFn& nextAction, const CallbackFn& errorFn) {
+ executor::RemoteCommandRequest getRollbackIDReq(
+ _syncSource, "admin", BSON("replSetGetRBID" << 1));
+ auto cbh = _executor->scheduleRemoteCommand(getRollbackIDReq, nextAction);
+
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return RollbackChecker::CallbackHandle();
+ }
+ if (!cbh.isOK()) {
+ errorFn(cbh.getStatus());
+ return RollbackChecker::CallbackHandle();
+ }
+ return cbh.getValue();
+}
+
+void RollbackChecker::_setRBID_inlock(int rbid) {
+ _baseRBID = rbid;
+ _lastRBID = rbid;
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_checker.h b/src/mongo/db/repl/rollback_checker.h
new file mode 100644
index 00000000000..1d48e238512
--- /dev/null
+++ b/src/mongo/db/repl/rollback_checker.h
@@ -0,0 +1,127 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status_with.h"
+#include "mongo/executor/task_executor.h"
+
+namespace mongo {
+namespace repl {
+
+class Mutex;
+
+/**
+ * The RollbackChecker maintains a sync source and its baseline rollback ID (rbid). It
+ * contains methods to check if a rollback occurred by checking if the rbid has changed since
+ * the baseline was set.
+ *
+ * To use the RollbackChecker:
+ * 1) Create a RollbackChecker by passing in an executor, and a sync source to use. If the
+ * sync source changes, make a new RollbackChecker.
+ * 2) Call reset(), either synchronously or asynchronously, so that the RollbackChecker retrieves
+ * the state it needs from the sync source to check for rollbacks.
+ * 3) Call checkForRollback(), passing in the next action to check if there was a rollback at the
+ * sync source. If there is a rollback or another error, the error function provided will be
+ * called. This error could be an UnrecoverableRollbackError, when there is a rollback.
+ * Alternatively, call hasHadRollback() which synchonously checks for a rollback and returns
+ * true if any error occurs. Checking for a rollback does not reset the baseline rbid.
+ * 4) Repeat steps 2 and 3 as needed.
+ *
+ */
+class RollbackChecker {
+ MONGO_DISALLOW_COPYING(RollbackChecker);
+
+public:
+ using CallbackFn = stdx::function<void(const Status& status)>;
+ using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
+ using CallbackHandle = executor::TaskExecutor::CallbackHandle;
+
+ RollbackChecker(executor::TaskExecutor* executor, HostAndPort syncSource);
+
+ virtual ~RollbackChecker();
+
+ // Checks whether the the sync source has had a rollback since the last time reset was called,
+ // and then calls the nextAction with a status specifying what should occur next. The status
+ // will either be OK if there was no rollback and no error, UnrecoverableRollbackError if there
+ // was a rollback, or another status if the command failed. The nextAction should account for
+ // each of these cases.
+ CallbackHandle checkForRollback(const CallbackFn& nextAction);
+
+ // Synchronously checks if there has been a rollback and returns a boolean specifying if one
+ // has occurred. If any error occurs this will return true.
+ bool hasHadRollback();
+
+ // Resets the state used to decide if a rollback occurs, and then calls the nextAction with a
+ // status specifying what should occur next. The status will either be OK if there was no
+ // error or another status if the command failed. The nextAction should account for
+ // each of these cases.
+ CallbackHandle reset(const CallbackFn& nextAction);
+
+ // Synchronously calls reset and returns the Status of the command.
+ Status reset_sync();
+
+ // ================== Test Support API ===================
+
+ // Returns the current baseline rbid.
+ int getBaseRBID_forTest();
+
+ // Returns the last rbid seen.
+ int getLastRBID_forTest();
+
+private:
+ // Assumes a lock has been taken. Returns if a rollback has occurred by comparing the remoteRBID
+ // provided and the stored baseline rbid. Sets _lastRBID to the remoteRBID provided.
+ bool _checkForRollback_inlock(int remoteRBID);
+
+ // Schedules a remote command to get the rbid at the sync source and then calls the nextAction.
+ // If there is an error scheduling the call, it calls the errorFn with the status.
+ CallbackHandle _scheduleGetRollbackId(const RemoteCommandCallbackFn& nextAction,
+ const CallbackFn& errorFn);
+
+ // Assumes a lock has been taken. Sets the current rbid used as the baseline for rollbacks.
+ void _setRBID_inlock(int rbid);
+
+ // Not owned by us.
+ executor::TaskExecutor* const _executor;
+
+ // Protects member data of this RollbackChecker.
+ mutable stdx::mutex _mutex;
+
+ // The sync source to check for rollbacks against.
+ HostAndPort _syncSource;
+
+ // The baseline rbid of the sync source to use when deciding if a rollback should occur.
+ int _baseRBID;
+
+ // The last rbid that we saw.
+ int _lastRBID;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp
new file mode 100644
index 00000000000..bf7d53ce610
--- /dev/null
+++ b/src/mongo/db/repl/rollback_checker_test.cpp
@@ -0,0 +1,186 @@
+
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_executor_test_fixture.h"
+#include "mongo/db/repl/rollback_checker.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/util/log.h"
+
+#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/unittest.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+using executor::NetworkInterfaceMock;
+using executor::RemoteCommandResponse;
+
+class RollbackCheckerTest : public ReplicationExecutorTest {
+public:
+ RollbackChecker* getRollbackChecker() const;
+
+ void scheduleNetworkResponse(const BSONObj& obj) {
+ NetworkInterfaceMock* net = getNet();
+ ASSERT_TRUE(net->hasReadyRequests());
+ scheduleNetworkResponse(net->getNextReadyRequest(), obj);
+ }
+
+ void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,
+ const BSONObj& obj) {
+ NetworkInterfaceMock* net = getNet();
+ Milliseconds millis(0);
+ RemoteCommandResponse response(obj, BSONObj(), millis);
+ ReplicationExecutor::ResponseStatus responseStatus(response);
+ net->scheduleResponse(noi, net->now(), responseStatus);
+ }
+
+protected:
+ void setUp() override;
+
+ std::unique_ptr<RollbackChecker> _rollbackChecker;
+ bool _hasRolledBack;
+ bool _hasCalledCallback;
+ mutable stdx::mutex _mutex;
+};
+
+void RollbackCheckerTest::setUp() {
+ ReplicationExecutorTest::setUp();
+ launchExecutorThread();
+ _rollbackChecker = stdx::make_unique<RollbackChecker>(&getReplExecutor(), HostAndPort());
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _hasRolledBack = false;
+ _hasCalledCallback = false;
+}
+
+RollbackChecker* RollbackCheckerTest::getRollbackChecker() const {
+ return _rollbackChecker.get();
+}
+
+TEST_F(RollbackCheckerTest, InvalidConstruction) {
+ HostAndPort syncSource;
+
+ // Null executor.
+ ASSERT_THROWS_CODE(RollbackChecker(nullptr, syncSource), UserException, ErrorCodes::BadValue);
+}
+
+TEST_F(RollbackCheckerTest, ShutdownBeforeStart) {
+ auto callback = [](const Status& args) {};
+ getReplExecutor().shutdown();
+ ASSERT(!getRollbackChecker()->reset(callback));
+ ASSERT(!getRollbackChecker()->checkForRollback(callback));
+}
+
+TEST_F(RollbackCheckerTest, reset) {
+ auto callback = [this](const Status& status) {};
+ auto cbh = getRollbackChecker()->reset(callback);
+ ASSERT(cbh);
+
+ auto commandResponse = BSON("ok" << 1 << "rbid" << 3);
+ scheduleNetworkResponse(commandResponse);
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+
+ getReplExecutor().wait(cbh);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+}
+
+TEST_F(RollbackCheckerTest, RollbackRBID) {
+ auto callback = [this](const Status& status) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (status.isOK()) {
+ _hasCalledCallback = true;
+ } else if (status.code() == ErrorCodes::UnrecoverableRollbackError) {
+ _hasRolledBack = true;
+ }
+ };
+ // First set the RBID to 3.
+ auto refreshCBH = getRollbackChecker()->reset([](const Status& status) {});
+ ASSERT(refreshCBH);
+ auto commandResponse = BSON("ok" << 1 << "rbid" << 3);
+ scheduleNetworkResponse(commandResponse);
+ getNet()->runReadyNetworkOperations();
+ getReplExecutor().wait(refreshCBH);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+
+ // Check for rollback
+ auto rbCBH = getRollbackChecker()->checkForRollback(callback);
+ ASSERT(rbCBH);
+
+ commandResponse = BSON("ok" << 1 << "rbid" << 4);
+ scheduleNetworkResponse(commandResponse);
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+
+ getReplExecutor().wait(rbCBH);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ ASSERT_TRUE(_hasRolledBack);
+ ASSERT_FALSE(_hasCalledCallback);
+ ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 4);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+}
+
+TEST_F(RollbackCheckerTest, NoRollbackRBID) {
+ auto callback = [this](const Status& status) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (status.isOK()) {
+ _hasCalledCallback = true;
+ } else if (status.code() == ErrorCodes::UnrecoverableRollbackError) {
+ _hasRolledBack = true;
+ }
+ };
+ // First set the RBID to 3.
+ auto refreshCBH = getRollbackChecker()->reset(callback);
+ ASSERT(refreshCBH);
+ auto commandResponse = BSON("ok" << 1 << "rbid" << 3);
+ scheduleNetworkResponse(commandResponse);
+ getNet()->runReadyNetworkOperations();
+ getReplExecutor().wait(refreshCBH);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+
+ // Check for rollback
+ auto rbCBH = getRollbackChecker()->checkForRollback(callback);
+ ASSERT(rbCBH);
+
+ commandResponse = BSON("ok" << 1 << "rbid" << 3);
+ scheduleNetworkResponse(commandResponse);
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+
+ getReplExecutor().wait(rbCBH);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ ASSERT_FALSE(_hasRolledBack);
+ ASSERT_TRUE(_hasCalledCallback);
+ ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 3);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+}
+} // namespace