diff options
author | Judah Schvimer <judah@mongodb.com> | 2016-05-26 17:50:05 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2016-05-26 17:50:05 -0400 |
commit | 832d65c6eec6eb4314b2700a18a332185b5baa36 (patch) | |
tree | f323f092748598aa783f55705ce04728804567b5 /src/mongo | |
parent | b6221d5e2f3e95221d73947bf0fba6772b19e49b (diff) | |
download | mongo-832d65c6eec6eb4314b2700a18a332185b5baa36.tar.gz |
SERVER-22244 Restart initial sync on sync source rollback
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/SConscript | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_checker.cpp | 158 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_checker.h | 127 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_checker_test.cpp | 186 |
6 files changed, 570 insertions, 1 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d83a3609938..99b443db2bf 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -900,6 +900,7 @@ env.Library( 'oplog_fetcher', 'optime', 'reporter', + 'rollback_checker', 'storage_interface', '$BUILD_DIR/mongo/client/fetcher', ], @@ -920,6 +921,28 @@ env.CppUnitTest( ) env.Library( + target='rollback_checker', + source=[ + 'rollback_checker.cpp', + ], + LIBDEPS=[ + 'replication_executor', + ], +) + +env.CppUnitTest( + target='rollback_checker_test', + source=[ + 'rollback_checker_test.cpp', + ], + LIBDEPS=[ + 'replication_executor_test_fixture', + 'rollback_checker', + '$BUILD_DIR/mongo/unittest/concurrency', + ], +) + +env.Library( target='roll_back_local_operations', source=[ 'roll_back_local_operations.cpp', diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 41e15b19daf..f09013af383 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/rollback_checker.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -649,6 +650,13 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) { attemptErrorStatus = _ensureGoodSyncSource_inlock(); } + RollbackChecker rollbackChecker(_exec, _syncSource); + if (attemptErrorStatus.isOK()) { + lk.unlock(); + attemptErrorStatus = rollbackChecker.reset_sync(); + lk.lock(); + } + if (attemptErrorStatus.isOK()) { StatusWith<Event> status = _exec->makeEvent(); if (!status.isOK()) { @@ -700,6 +708,10 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) { _initialSyncState->dbsCloner.start(); // When the cloner is done applier starts. invariant(_initialSyncState->finishEvent.isValid()); _exec->waitForEvent(_initialSyncState->finishEvent); + if (rollbackChecker.hasHadRollback()) { + _initialSyncState->setStatus(Status(ErrorCodes::UnrecoverableRollbackError, + "Rollback occurred during initial sync")); + } attemptErrorStatus = _initialSyncState->status; // Re-lock DataReplicator Internals diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 939989d8aff..647b623e689 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -450,6 +450,8 @@ TEST_F(InitialSyncTest, Complete) { */ const std::vector<BSONObj> responses = { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), // get latest oplog ts fromjson( str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" @@ -485,6 +487,8 @@ TEST_F(InitialSyncTest, Complete) { "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion << ", op:'i', o:{_id:1, c:1}}]}}"), // Applier starts ... + // check for rollback + fromjson(str::stream() << "{ok: 1, rbid:1}"), }; // Initial sync flag should not be set before starting. @@ -525,6 +529,8 @@ TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) { }; const std::vector<BSONObj> responses = { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), // get latest oplog ts fromjson( str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" @@ -562,6 +568,8 @@ TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) { "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" "{_id:1, a:1} " "]}}"), + // check for rollback + fromjson(str::stream() << "{ok: 1, rbid:1}"), }; startSync(); setResponses(responses); @@ -601,6 +609,8 @@ TEST_F(InitialSyncTest, Failpoint) { TEST_F(InitialSyncTest, FailsOnClone) { const std::vector<BSONObj> responses = { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), // get latest oplog ts fromjson( str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" @@ -613,13 +623,66 @@ TEST_F(InitialSyncTest, FailsOnClone) { << ", op:'i', o:{_id:1, a:1}}]}}"), // Clone Start // listDatabases - fromjson("{ok:0}")}; + fromjson("{ok:0}"), + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), + }; startSync(); setResponses(responses); playResponses(true); verifySync(ErrorCodes::InitialSyncFailure); } +TEST_F(InitialSyncTest, FailOnRollback) { + const std::vector<BSONObj> responses = { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), + // get latest oplog ts + fromjson( + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // oplog fetcher find + fromjson( + str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // Clone Start + // listDatabases + fromjson("{ok:1, databases:[{name:'a'}]}"), + // listCollections for "a" + fromjson( + "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}"), + // listIndexes:a + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), + // find:a + fromjson( + "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}"), + // Clone Done + // get latest oplog ts + fromjson( + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, c:1}}]}}"), + // Applier starts ... + // check for rollback + fromjson(str::stream() << "{ok: 1, rbid:2}"), + }; + + startSync(); + setResponses({responses}); + playResponses(true); + verifySync(ErrorCodes::InitialSyncFailure); +} + + class TestSyncSourceSelector2 : public SyncSourceSelector { public: void clearSyncSourceBlacklist() override {} diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp new file mode 100644 index 00000000000..7d1a710982a --- /dev/null +++ b/src/mongo/db/repl/rollback_checker.cpp @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/rollback_checker.h" + +#include "mongo/stdx/mutex.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; +using UniqueLock = stdx::unique_lock<stdx::mutex>; + +RollbackChecker::RollbackChecker(executor::TaskExecutor* executor, HostAndPort syncSource) + : _executor(executor), _syncSource(syncSource), _baseRBID(-1), _lastRBID(-1) { + uassert(ErrorCodes::BadValue, "null task executor", executor); +} + +RollbackChecker::~RollbackChecker() {} + +RollbackChecker::CallbackHandle RollbackChecker::checkForRollback(const CallbackFn& nextAction) { + return _scheduleGetRollbackId([this, nextAction](const RemoteCommandCallbackArgs& args) { + if (args.response.getStatus() == ErrorCodes::CallbackCanceled) { + return; + } + if (!args.response.isOK()) { + nextAction(args.response.getStatus()); + return; + } + if (auto rbidElement = args.response.getValue().data["rbid"]) { + int remoteRBID = rbidElement.numberInt(); + + UniqueLock lk(_mutex); + bool hadRollback = _checkForRollback_inlock(remoteRBID); + lk.unlock(); + + if (hadRollback) { + nextAction(Status(ErrorCodes::UnrecoverableRollbackError, + "RollbackChecker detected rollback occurred")); + } else { + nextAction(Status::OK()); + } + } else { + nextAction(Status(ErrorCodes::CommandFailed, + "replSetGetRBID command failed when checking for rollback")); + } + }, nextAction); +} + +bool RollbackChecker::hasHadRollback() { + // Default to true in case the callback is not called in an error case. + bool hasHadRollback = true; + auto cbh = checkForRollback( + [&hasHadRollback](const Status& status) { hasHadRollback = !status.isOK(); }); + _executor->wait(cbh); + return hasHadRollback; +} + +RollbackChecker::CallbackHandle RollbackChecker::reset(const CallbackFn& nextAction) { + return _scheduleGetRollbackId([this, nextAction](const RemoteCommandCallbackArgs& args) { + if (args.response.getStatus() == ErrorCodes::CallbackCanceled) { + return; + } + if (!args.response.isOK()) { + nextAction(args.response.getStatus()); + return; + } + if (auto rbidElement = args.response.getValue().data["rbid"]) { + int newRBID = rbidElement.numberInt(); + + UniqueLock lk(_mutex); + _setRBID_inlock(newRBID); + lk.unlock(); + + nextAction(Status::OK()); + } else { + nextAction(Status(ErrorCodes::CommandFailed, + "replSetGetRBID command failed when checking for rollback")); + } + }, nextAction); +} + +Status RollbackChecker::reset_sync() { + // Default to an error in case the callback is not called in an error case. + Status resetStatus = Status(ErrorCodes::CommandFailed, "RollbackChecker reset failed"); + auto cbh = reset([&resetStatus](const Status& status) { resetStatus = status; }); + _executor->wait(cbh); + return resetStatus; +} + +int RollbackChecker::getBaseRBID_forTest() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _baseRBID; +} + +int RollbackChecker::getLastRBID_forTest() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _lastRBID; +} + +bool RollbackChecker::_checkForRollback_inlock(int remoteRBID) { + _lastRBID = remoteRBID; + return remoteRBID != _baseRBID; +} + +RollbackChecker::CallbackHandle RollbackChecker::_scheduleGetRollbackId( + const RemoteCommandCallbackFn& nextAction, const CallbackFn& errorFn) { + executor::RemoteCommandRequest getRollbackIDReq( + _syncSource, "admin", BSON("replSetGetRBID" << 1)); + auto cbh = _executor->scheduleRemoteCommand(getRollbackIDReq, nextAction); + + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return RollbackChecker::CallbackHandle(); + } + if (!cbh.isOK()) { + errorFn(cbh.getStatus()); + return RollbackChecker::CallbackHandle(); + } + return cbh.getValue(); +} + +void RollbackChecker::_setRBID_inlock(int rbid) { + _baseRBID = rbid; + _lastRBID = rbid; +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_checker.h b/src/mongo/db/repl/rollback_checker.h new file mode 100644 index 00000000000..1d48e238512 --- /dev/null +++ b/src/mongo/db/repl/rollback_checker.h @@ -0,0 +1,127 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/executor/task_executor.h" + +namespace mongo { +namespace repl { + +class Mutex; + +/** + * The RollbackChecker maintains a sync source and its baseline rollback ID (rbid). It + * contains methods to check if a rollback occurred by checking if the rbid has changed since + * the baseline was set. + * + * To use the RollbackChecker: + * 1) Create a RollbackChecker by passing in an executor, and a sync source to use. If the + * sync source changes, make a new RollbackChecker. + * 2) Call reset(), either synchronously or asynchronously, so that the RollbackChecker retrieves + * the state it needs from the sync source to check for rollbacks. + * 3) Call checkForRollback(), passing in the next action to check if there was a rollback at the + * sync source. If there is a rollback or another error, the error function provided will be + * called. This error could be an UnrecoverableRollbackError, when there is a rollback. + * Alternatively, call hasHadRollback() which synchonously checks for a rollback and returns + * true if any error occurs. Checking for a rollback does not reset the baseline rbid. + * 4) Repeat steps 2 and 3 as needed. + * + */ +class RollbackChecker { + MONGO_DISALLOW_COPYING(RollbackChecker); + +public: + using CallbackFn = stdx::function<void(const Status& status)>; + using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; + using CallbackHandle = executor::TaskExecutor::CallbackHandle; + + RollbackChecker(executor::TaskExecutor* executor, HostAndPort syncSource); + + virtual ~RollbackChecker(); + + // Checks whether the the sync source has had a rollback since the last time reset was called, + // and then calls the nextAction with a status specifying what should occur next. The status + // will either be OK if there was no rollback and no error, UnrecoverableRollbackError if there + // was a rollback, or another status if the command failed. The nextAction should account for + // each of these cases. + CallbackHandle checkForRollback(const CallbackFn& nextAction); + + // Synchronously checks if there has been a rollback and returns a boolean specifying if one + // has occurred. If any error occurs this will return true. + bool hasHadRollback(); + + // Resets the state used to decide if a rollback occurs, and then calls the nextAction with a + // status specifying what should occur next. The status will either be OK if there was no + // error or another status if the command failed. The nextAction should account for + // each of these cases. + CallbackHandle reset(const CallbackFn& nextAction); + + // Synchronously calls reset and returns the Status of the command. + Status reset_sync(); + + // ================== Test Support API =================== + + // Returns the current baseline rbid. + int getBaseRBID_forTest(); + + // Returns the last rbid seen. + int getLastRBID_forTest(); + +private: + // Assumes a lock has been taken. Returns if a rollback has occurred by comparing the remoteRBID + // provided and the stored baseline rbid. Sets _lastRBID to the remoteRBID provided. + bool _checkForRollback_inlock(int remoteRBID); + + // Schedules a remote command to get the rbid at the sync source and then calls the nextAction. + // If there is an error scheduling the call, it calls the errorFn with the status. + CallbackHandle _scheduleGetRollbackId(const RemoteCommandCallbackFn& nextAction, + const CallbackFn& errorFn); + + // Assumes a lock has been taken. Sets the current rbid used as the baseline for rollbacks. + void _setRBID_inlock(int rbid); + + // Not owned by us. + executor::TaskExecutor* const _executor; + + // Protects member data of this RollbackChecker. + mutable stdx::mutex _mutex; + + // The sync source to check for rollbacks against. + HostAndPort _syncSource; + + // The baseline rbid of the sync source to use when deciding if a rollback should occur. + int _baseRBID; + + // The last rbid that we saw. + int _lastRBID; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp new file mode 100644 index 00000000000..bf7d53ce610 --- /dev/null +++ b/src/mongo/db/repl/rollback_checker_test.cpp @@ -0,0 +1,186 @@ + +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/db/repl/rollback_checker.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/util/log.h" + +#include "mongo/unittest/barrier.h" +#include "mongo/unittest/unittest.h" + +namespace { + +using namespace mongo; +using namespace mongo::repl; +using executor::NetworkInterfaceMock; +using executor::RemoteCommandResponse; + +class RollbackCheckerTest : public ReplicationExecutorTest { +public: + RollbackChecker* getRollbackChecker() const; + + void scheduleNetworkResponse(const BSONObj& obj) { + NetworkInterfaceMock* net = getNet(); + ASSERT_TRUE(net->hasReadyRequests()); + scheduleNetworkResponse(net->getNextReadyRequest(), obj); + } + + void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi, + const BSONObj& obj) { + NetworkInterfaceMock* net = getNet(); + Milliseconds millis(0); + RemoteCommandResponse response(obj, BSONObj(), millis); + ReplicationExecutor::ResponseStatus responseStatus(response); + net->scheduleResponse(noi, net->now(), responseStatus); + } + +protected: + void setUp() override; + + std::unique_ptr<RollbackChecker> _rollbackChecker; + bool _hasRolledBack; + bool _hasCalledCallback; + mutable stdx::mutex _mutex; +}; + +void RollbackCheckerTest::setUp() { + ReplicationExecutorTest::setUp(); + launchExecutorThread(); + _rollbackChecker = stdx::make_unique<RollbackChecker>(&getReplExecutor(), HostAndPort()); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _hasRolledBack = false; + _hasCalledCallback = false; +} + +RollbackChecker* RollbackCheckerTest::getRollbackChecker() const { + return _rollbackChecker.get(); +} + +TEST_F(RollbackCheckerTest, InvalidConstruction) { + HostAndPort syncSource; + + // Null executor. + ASSERT_THROWS_CODE(RollbackChecker(nullptr, syncSource), UserException, ErrorCodes::BadValue); +} + +TEST_F(RollbackCheckerTest, ShutdownBeforeStart) { + auto callback = [](const Status& args) {}; + getReplExecutor().shutdown(); + ASSERT(!getRollbackChecker()->reset(callback)); + ASSERT(!getRollbackChecker()->checkForRollback(callback)); +} + +TEST_F(RollbackCheckerTest, reset) { + auto callback = [this](const Status& status) {}; + auto cbh = getRollbackChecker()->reset(callback); + ASSERT(cbh); + + auto commandResponse = BSON("ok" << 1 << "rbid" << 3); + scheduleNetworkResponse(commandResponse); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + + getReplExecutor().wait(cbh); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); +} + +TEST_F(RollbackCheckerTest, RollbackRBID) { + auto callback = [this](const Status& status) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (status.isOK()) { + _hasCalledCallback = true; + } else if (status.code() == ErrorCodes::UnrecoverableRollbackError) { + _hasRolledBack = true; + } + }; + // First set the RBID to 3. + auto refreshCBH = getRollbackChecker()->reset([](const Status& status) {}); + ASSERT(refreshCBH); + auto commandResponse = BSON("ok" << 1 << "rbid" << 3); + scheduleNetworkResponse(commandResponse); + getNet()->runReadyNetworkOperations(); + getReplExecutor().wait(refreshCBH); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + + // Check for rollback + auto rbCBH = getRollbackChecker()->checkForRollback(callback); + ASSERT(rbCBH); + + commandResponse = BSON("ok" << 1 << "rbid" << 4); + scheduleNetworkResponse(commandResponse); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + + getReplExecutor().wait(rbCBH); + stdx::lock_guard<stdx::mutex> lk(_mutex); + ASSERT_TRUE(_hasRolledBack); + ASSERT_FALSE(_hasCalledCallback); + ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 4); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); +} + +TEST_F(RollbackCheckerTest, NoRollbackRBID) { + auto callback = [this](const Status& status) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (status.isOK()) { + _hasCalledCallback = true; + } else if (status.code() == ErrorCodes::UnrecoverableRollbackError) { + _hasRolledBack = true; + } + }; + // First set the RBID to 3. + auto refreshCBH = getRollbackChecker()->reset(callback); + ASSERT(refreshCBH); + auto commandResponse = BSON("ok" << 1 << "rbid" << 3); + scheduleNetworkResponse(commandResponse); + getNet()->runReadyNetworkOperations(); + getReplExecutor().wait(refreshCBH); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); + + // Check for rollback + auto rbCBH = getRollbackChecker()->checkForRollback(callback); + ASSERT(rbCBH); + + commandResponse = BSON("ok" << 1 << "rbid" << 3); + scheduleNetworkResponse(commandResponse); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + + getReplExecutor().wait(rbCBH); + stdx::lock_guard<stdx::mutex> lk(_mutex); + ASSERT_FALSE(_hasRolledBack); + ASSERT_TRUE(_hasCalledCallback); + ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 3); + ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3); +} +} // namespace |