/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include
#include "mongo/client/fetcher.h"
#include "mongo/db/json.h"
#include "mongo/db/repl/base_cloner_test_fixture.h"
#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
namespace {
using namespace mongo;
using namespace mongo::repl;
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using LockGuard = stdx::lock_guard;
using UniqueLock = stdx::unique_lock;
using mutex = stdx::mutex;
class SyncSourceSelectorMock : public SyncSourceSelector {
MONGO_DISALLOW_COPYING(SyncSourceSelectorMock);
public:
SyncSourceSelectorMock(const HostAndPort& syncSource) : _syncSource(syncSource) {}
void clearSyncSourceBlacklist() override {}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
HostAndPort result = _syncSource;
_syncSource = HostAndPort();
return result;
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_blacklistedSource = host;
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& sourcesOpTime,
bool syncSourceHasSyncSource) override {
return false;
}
HostAndPort _syncSource;
HostAndPort _blacklistedSource;
};
class DataReplicatorTest : public ReplicationExecutorTest, public SyncSourceSelector {
public:
DataReplicatorTest() {}
void postExecutorThreadLaunch() override{};
/**
* clear/reset state
*/
void reset() {
_applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
_rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&)
-> Status { return Status::OK(); };
_setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
_myLastOpTime = OpTime();
_memberState = MemberState::RS_UNKNOWN;
_syncSourceSelector.reset(new SyncSourceSelectorMock(HostAndPort("localhost", -1)));
}
// SyncSourceSelector
void clearSyncSourceBlacklist() override {
_syncSourceSelector->clearSyncSourceBlacklist();
}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
return _syncSourceSelector->chooseNewSyncSource(ts);
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& sourcesOpTime,
bool syncSourceHasSyncSource) override {
return _syncSourceSelector->shouldChangeSyncSource(
currentSource, sourcesOpTime, syncSourceHasSyncSource);
}
void scheduleNetworkResponse(const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
scheduleNetworkResponse(net->getNextReadyRequest(), obj);
}
void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi,
const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
Milliseconds millis(0);
RemoteCommandResponse response(obj, BSONObj(), millis);
ReplicationExecutor::ResponseStatus responseStatus(response);
net->scheduleResponse(noi, net->now(), responseStatus);
}
void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
ReplicationExecutor::ResponseStatus responseStatus(code, reason);
net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
}
void processNetworkResponse(const BSONObj& obj) {
scheduleNetworkResponse(obj);
finishProcessingNetworkResponse();
}
void processNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
scheduleNetworkResponse(code, reason);
finishProcessingNetworkResponse();
}
void finishProcessingNetworkResponse() {
getNet()->runReadyNetworkOperations();
ASSERT_FALSE(getNet()->hasReadyRequests());
}
DataReplicator& getDR() {
return *_dr;
}
protected:
void setUp() override {
ReplicationExecutorTest::setUp();
reset();
launchExecutorThread();
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
options.applierFn = [this](OperationContext* txn, const BSONObj& operation) {
return _applierFn(txn, operation);
};
options.rollbackFn = [this](OperationContext* txn,
const OpTime& lastOpTimeWritten,
const HostAndPort& syncSource) -> Status {
return _rollbackFn(txn, lastOpTimeWritten, syncSource);
};
options.prepareReplSetUpdatePositionCommandFn =
[]() -> StatusWith { return BSON("replSetUpdatePosition" << 1); };
options.getMyLastOptime = [this]() { return _myLastOpTime; };
options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
options.setFollowerMode = [this](const MemberState& state) {
_memberState = state;
return true;
};
options.syncSourceSelector = this;
try {
_dr.reset(new DataReplicator(options, &(getReplExecutor())));
} catch (...) {
ASSERT_OK(exceptionToStatus());
}
}
void tearDown() override {
ReplicationExecutorTest::tearDown();
_dr.reset();
// Executor may still invoke callback before shutting down.
}
Applier::ApplyOperationFn _applierFn;
DataReplicatorOptions::RollbackFn _rollbackFn;
DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
MemberState _memberState;
std::unique_ptr _syncSourceSelector;
private:
std::unique_ptr _dr;
};
TEST_F(DataReplicatorTest, CreateDestroy) {}
TEST_F(DataReplicatorTest, StartOk) {
ASSERT_EQ(getDR().start().code(), ErrorCodes::OK);
}
TEST_F(DataReplicatorTest, CannotInitialSyncAfterStart) {
ASSERT_EQ(getDR().start().code(), ErrorCodes::OK);
ASSERT_EQ(getDR().initialSync(), ErrorCodes::AlreadyInitialized);
}
// Used to run a Initial Sync in a separate thread, to avoid blocking test execution.
class InitialSyncBackgroundRunner {
public:
InitialSyncBackgroundRunner(DataReplicator* dr)
: _dr(dr), _result(Status(ErrorCodes::BadValue, "failed to set status")) {}
// Could block if _sgr has not finished
TimestampStatus getResult() {
_thread->join();
return _result;
}
void run() {
_thread.reset(new stdx::thread(stdx::bind(&InitialSyncBackgroundRunner::_run, this)));
}
private:
void _run() {
setThreadName("InitialSyncRunner");
_result = _dr->initialSync(); // blocking
}
DataReplicator* _dr;
TimestampStatus _result;
std::unique_ptr _thread;
};
class InitialSyncTest : public DataReplicatorTest {
public:
InitialSyncTest()
: _insertCollectionFn([&](OperationContext* txn,
const NamespaceString& theNss,
const std::vector& theDocuments) {
log() << "insertDoc for " << theNss.toString();
LockGuard lk(_collectionCountMutex);
++(_collectionCounts[theNss.toString()]);
return Status::OK();
}),
_beginCollectionFn([&](OperationContext* txn,
const NamespaceString& theNss,
const CollectionOptions& theOptions,
const std::vector& theIndexSpecs) {
log() << "beginCollection for " << theNss.toString();
LockGuard lk(_collectionCountMutex);
_collectionCounts[theNss.toString()] = 0;
return Status::OK();
}){};
protected:
void setStorageFuncs(ClonerStorageInterfaceMock::InsertCollectionFn ins,
ClonerStorageInterfaceMock::BeginCollectionFn beg) {
_insertCollectionFn = ins;
_beginCollectionFn = beg;
}
void setResponses(std::vector resps) {
_responses = resps;
}
void startSync() {
DataReplicator* dr = &(getDR());
_storage.beginCollectionFn = _beginCollectionFn;
_storage.insertDocumentsFn = _insertCollectionFn;
_storage.insertMissingDocFn =
[&](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) {
return Status::OK();
};
dr->_setInitialSyncStorageInterface(&_storage);
_isbr.reset(new InitialSyncBackgroundRunner(dr));
_isbr->run();
}
void playResponses() {
// TODO: Handle network responses
NetworkInterfaceMock* net = getNet();
int processedRequests(0);
const int expectedResponses(_responses.size());
// counter for oplog entries
int c(0);
while (true) {
net->enterNetwork();
if (!net->hasReadyRequests() && processedRequests < expectedResponses) {
net->exitNetwork();
continue;
}
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const BSONObj reqBSON = noi->getRequest().cmdObj;
const BSONElement cmdElem = reqBSON.firstElement();
const bool isGetMore = cmdElem.fieldNameStringData().equalCaseInsensitive("getmore");
const long long cursorId = cmdElem.numberLong();
if (isGetMore && cursorId == 1LL) {
// process getmore requests from the oplog fetcher
auto respBSON =
fromjson(str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'"
" , nextBatch:[{ts:Timestamp(" << ++c
<< ",1), h:1, ns:'test.a', v:2, op:'u', o2:{_id:" << c
<< "}, o:{$set:{a:1}}}"
"]}}");
net->scheduleResponse(
noi,
net->now(),
ResponseStatus(RemoteCommandResponse(respBSON, BSONObj(), Milliseconds(10))));
net->runReadyNetworkOperations();
net->exitNetwork();
continue;
} else if (isGetMore) {
// TODO: return more data
}
// process fixed set of responses
log() << "processing network request: " << noi->getRequest().dbname << "."
<< noi->getRequest().cmdObj.toString();
net->scheduleResponse(noi,
net->now(),
ResponseStatus(RemoteCommandResponse(
_responses[processedRequests], BSONObj(), Milliseconds(10))));
net->runReadyNetworkOperations();
net->exitNetwork();
if (++processedRequests >= expectedResponses) {
log() << "done processing expected requests ";
break; // once we have processed all requests, continue;
}
}
net->enterNetwork();
if (net->hasReadyRequests()) {
log() << "There are unexpected requests left";
log() << "next cmd: " << net->getNextReadyRequest()->getRequest().cmdObj.toString();
ASSERT_FALSE(net->hasReadyRequests());
}
net->exitNetwork();
}
void verifySync(Status s = Status::OK()) {
verifySync(s.code());
}
void verifySync(ErrorCodes::Error code) {
// Check result
ASSERT_EQ(_isbr->getResult().getStatus().code(), code) << "status codes differ";
}
std::map getLocalCollectionCounts() {
return _collectionCounts;
}
private:
ClonerStorageInterfaceMock::InsertCollectionFn _insertCollectionFn;
ClonerStorageInterfaceMock::BeginCollectionFn _beginCollectionFn;
std::vector _responses;
std::unique_ptr _isbr;
std::map _collectionCounts; // counts of inserts during cloning
mutex _collectionCountMutex; // used to protect the collectionCount map
ClonerStorageInterfaceMock _storage;
};
TEST_F(InitialSyncTest, Complete) {
/**
* Initial Sync will issue these query/commands
* - startTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"]
* - listDatabases (foreach db do below)
* -- cloneDatabase (see DatabaseCloner tests).
* - endTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"]
* - ops = oplog.rs->find({ts:{$gte: startTS}}) (foreach op)
* -- if local doc is missing, getCollection(op.ns).findOne(_id:op.o2._id)
* - if any retries were done in the previous loop, endTS query again for minvalid
*
*/
const std::vector responses = {
// get latest oplog ts
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
"]}}"),
// oplog fetcher find
fromjson(
"{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
"]}}"),
// Clone Start
// listDatabases
fromjson("{ok:1, databases:[{name:'a'}]}"),
// listCollections for "a"
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}"),
// listIndexes:a
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
"{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}"
"]}}"),
// find:a
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
"{_id:1, a:1} "
"]}}"),
// Clone Done
// get latest oplog ts
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}"
"]}}"),
// Applier starts ...
};
startSync();
setResponses(responses);
playResponses();
verifySync();
}
TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
_applierFn = [&](OperationContext* txn, const BSONObj& op) {
if (++applyCounter == 1) {
return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
}
return Status::OK();
};
const std::vector responses = {
// get latest oplog ts
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
"]}}"),
// oplog fetcher find
fromjson(
"{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'u', o2:{_id:1}, o:{$set:{a:1}}}"
"]}}"),
// Clone Start
// listDatabases
fromjson("{ok:1, databases:[{name:'a'}]}"),
// listCollections for "a"
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}"),
// listIndexes:a
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
"{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}"
"]}}"),
// find:a -- empty
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"),
// Clone Done
// get latest oplog ts
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}"
"]}}"),
// Applier starts ...
// missing doc fetch -- find:a {_id:1}
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
"{_id:1, a:1} "
"]}}"),
};
startSync();
setResponses(responses);
playResponses();
verifySync(ErrorCodes::OK);
}
TEST_F(InitialSyncTest, Failpoint) {
mongo::getGlobalFailPointRegistry()
->getFailPoint("failInitialSyncWithBadHost")
->setMode(FailPoint::alwaysOn);
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version" << 1 << "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345")));
Timestamp time1(100, 1);
OpTime opTime1(time1, OpTime::kInitialTerm);
_myLastOpTime = opTime1;
_memberState = MemberState::RS_SECONDARY;
DataReplicator* dr = &(getDR());
InitialSyncBackgroundRunner isbr(dr);
isbr.run();
ASSERT_EQ(isbr.getResult().getStatus().code(), ErrorCodes::InitialSyncFailure);
mongo::getGlobalFailPointRegistry()
->getFailPoint("failInitialSyncWithBadHost")
->setMode(FailPoint::off);
}
TEST_F(InitialSyncTest, FailsOnClone) {
const std::vector responses = {
// get latest oplog ts
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
"]}}"),
// oplog fetcher find
fromjson(
"{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
"{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
"]}}"),
// Clone Start
// listDatabases
fromjson("{ok:0}")};
startSync();
setResponses(responses);
playResponses();
verifySync(ErrorCodes::InitialSyncFailure);
}
class TestSyncSourceSelector2 : public SyncSourceSelector {
public:
void clearSyncSourceBlacklist() override {}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
LockGuard lk(_mutex);
auto result = HostAndPort(str::stream() << "host-" << _nextSourceNum++, -1);
_condition.notify_all();
return result;
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
LockGuard lk(_mutex);
_blacklistedSource = host;
}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& sourcesOpTime,
bool syncSourceHasSyncSource) override {
return false;
}
mutable stdx::mutex _mutex;
stdx::condition_variable _condition;
int _nextSourceNum{0};
HostAndPort _blacklistedSource;
};
class SteadyStateTest : public DataReplicatorTest {
protected:
void _setUpOplogFetcherFailed() {
DataReplicator& dr = getDR();
_syncSourceSelector.reset(new TestSyncSourceSelector2());
_memberState = MemberState::RS_UNKNOWN;
auto net = getNet();
net->enterNetwork();
ASSERT_OK(dr.start());
}
void _testOplogFetcherFailed(const BSONObj& oplogFetcherResponse,
const Status& rollbackStatus,
const HostAndPort& expectedRollbackSource,
const HostAndPort& expectedBlacklistedSource,
const HostAndPort& expectedFinalSource,
const MemberState& expectedFinalState,
const DataReplicatorState& expectedDataReplicatorState,
int expectedNextSourceNum) {
stdx::mutex mutex;
OperationContext* rollbackTxn = nullptr;
HostAndPort rollbackSource;
unittest::Barrier barrier(2U);
_rollbackFn = [&](OperationContext* txn,
const OpTime& lastOpTimeWritten,
const HostAndPort& syncSource) -> Status {
stdx::lock_guard lock(mutex);
rollbackTxn = txn;
rollbackSource = syncSource;
barrier.countDownAndWait();
return rollbackStatus;
};
auto net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
auto noi = net->getNextReadyRequest();
ASSERT_EQUALS("find", std::string(noi->getRequest().cmdObj.firstElementFieldName()));
scheduleNetworkResponse(noi, oplogFetcherResponse);
net->runReadyNetworkOperations();
// Replicator state should be ROLLBACK before rollback function returns.
DataReplicator& dr = getDR();
ASSERT_EQUALS(toString(DataReplicatorState::Rollback), toString(dr.getState()));
// Wait for rollback function to be called.
barrier.countDownAndWait();
{
stdx::lock_guard lock(mutex);
ASSERT_TRUE(rollbackTxn);
ASSERT_EQUALS(expectedRollbackSource, rollbackSource);
}
dr.waitForState(expectedDataReplicatorState);
// Wait for data replicator to request a new sync source if rollback is expected to fail.
if (!rollbackStatus.isOK()) {
TestSyncSourceSelector2* syncSourceSelector =
static_cast(_syncSourceSelector.get());
UniqueLock lk(syncSourceSelector->_mutex);
while (syncSourceSelector->_nextSourceNum < expectedNextSourceNum) {
syncSourceSelector->_condition.wait(lk);
}
ASSERT_EQUALS(expectedBlacklistedSource, syncSourceSelector->_blacklistedSource);
}
ASSERT_EQUALS(expectedFinalSource, dr.getSyncSource());
ASSERT_EQUALS(expectedFinalState.toString(), _memberState.toString());
}
};
TEST_F(SteadyStateTest, StartWhenInSteadyState) {
DataReplicator& dr = getDR();
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
ASSERT_OK(dr.start());
ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState()));
ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr.start().code());
}
TEST_F(SteadyStateTest, ShutdownAfterStart) {
DataReplicator& dr = getDR();
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
auto net = getNet();
net->enterNetwork();
ASSERT_OK(dr.start());
ASSERT_TRUE(net->hasReadyRequests());
getReplExecutor().shutdown();
ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState()));
ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr.start().code());
}
TEST_F(SteadyStateTest, RequestShutdownAfterStart) {
DataReplicator& dr = getDR();
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
auto net = getNet();
net->enterNetwork();
ASSERT_OK(dr.start());
ASSERT_TRUE(net->hasReadyRequests());
ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState()));
// Simulating an invalid remote oplog query response. This will invalidate the existing
// sync source but that's fine because we're not testing oplog processing.
scheduleNetworkResponse(BSON("ok" << 0));
net->runReadyNetworkOperations();
ASSERT_OK(dr.scheduleShutdown());
net->exitNetwork(); // runs work item scheduled in 'scheduleShutdown()).
dr.waitForShutdown();
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
}
class ShutdownExecutorSyncSourceSelector : public SyncSourceSelector {
public:
ShutdownExecutorSyncSourceSelector(ReplicationExecutor* exec) : _exec(exec) {}
void clearSyncSourceBlacklist() override {}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
_exec->shutdown();
return HostAndPort();
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {}
bool shouldChangeSyncSource(const HostAndPort& currentSource,
const OpTime& sourcesOpTime,
bool syncSourceHasSyncSource) override {
return false;
}
ReplicationExecutor* _exec;
};
TEST_F(SteadyStateTest, ScheduleNextActionFailsAfterChoosingEmptySyncSource) {
_syncSourceSelector.reset(new ShutdownExecutorSyncSourceSelector(&getReplExecutor()));
DataReplicator& dr = getDR();
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
auto net = getNet();
net->enterNetwork();
ASSERT_OK(dr.start());
ASSERT_EQUALS(HostAndPort(), dr.getSyncSource());
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
}
TEST_F(SteadyStateTest, ChooseNewSyncSourceAfterFailedNetworkRequest) {
TestSyncSourceSelector2* testSyncSourceSelector = new TestSyncSourceSelector2();
_syncSourceSelector.reset(testSyncSourceSelector);
_memberState = MemberState::RS_UNKNOWN;
DataReplicator& dr = getDR();
ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState()));
auto net = getNet();
net->enterNetwork();
ASSERT_OK(dr.start());
ASSERT_TRUE(net->hasReadyRequests());
ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState()));
// Simulating an invalid remote oplog query response to cause the data replicator to
// blacklist the existing sync source and request a new one.
scheduleNetworkResponse(BSON("ok" << 0));
net->runReadyNetworkOperations();
// Wait for data replicator to request a new sync source.
{
UniqueLock lk(testSyncSourceSelector->_mutex);
while (testSyncSourceSelector->_nextSourceNum < 2) {
testSyncSourceSelector->_condition.wait(lk);
}
ASSERT_EQUALS(HostAndPort("host-0", -1), testSyncSourceSelector->_blacklistedSource);
}
ASSERT_EQUALS(HostAndPort("host-1", -1), dr.getSyncSource());
ASSERT_EQUALS(MemberState(MemberState::RS_UNKNOWN).toString(), _memberState.toString());
ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState()));
}
TEST_F(SteadyStateTest, RemoteOplogEmptyRollbackSucceeded) {
_setUpOplogFetcherFailed();
auto oplogFetcherResponse =
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch: []}}");
_testOplogFetcherFailed(oplogFetcherResponse,
Status::OK(),
HostAndPort("host-0", -1), // rollback source
HostAndPort(), // sync source should not be blacklisted.
HostAndPort("host-0", -1),
MemberState::RS_SECONDARY,
DataReplicatorState::Steady,
2);
}
TEST_F(SteadyStateTest, RemoteOplogEmptyRollbackFailed) {
_setUpOplogFetcherFailed();
auto oplogFetcherResponse =
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch: []}}");
_testOplogFetcherFailed(oplogFetcherResponse,
Status(ErrorCodes::OperationFailed, "rollback failed"),
HostAndPort("host-0", -1), // rollback source
HostAndPort("host-0", -1),
HostAndPort("host-1", -1),
MemberState::RS_UNKNOWN,
DataReplicatorState::Rollback,
2);
}
TEST_F(SteadyStateTest, RemoteOplogFirstOperationMissingTimestampRollbackFailed) {
_setUpOplogFetcherFailed();
auto oplogFetcherResponse =
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch: [{}]}}");
_testOplogFetcherFailed(oplogFetcherResponse,
Status(ErrorCodes::OperationFailed, "rollback failed"),
HostAndPort("host-0", -1), // rollback source
HostAndPort("host-0", -1),
HostAndPort("host-1", -1),
MemberState::RS_UNKNOWN,
DataReplicatorState::Rollback,
2);
}
TEST_F(SteadyStateTest, RemoteOplogFirstOperationTimestampDoesNotMatchRollbackFailed) {
_setUpOplogFetcherFailed();
auto oplogFetcherResponse = fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[{ts:Timestamp(1,1)}]}}");
_testOplogFetcherFailed(oplogFetcherResponse,
Status(ErrorCodes::OperationFailed, "rollback failed"),
HostAndPort("host-0", -1), // rollback source
HostAndPort("host-0", -1),
HostAndPort("host-1", -1),
MemberState::RS_UNKNOWN,
DataReplicatorState::Rollback,
2);
}
TEST_F(SteadyStateTest, RollbackTwoSyncSourcesBothFailed) {
_setUpOplogFetcherFailed();
auto oplogFetcherResponse =
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch: []}}");
_testOplogFetcherFailed(oplogFetcherResponse,
Status(ErrorCodes::OperationFailed, "rollback failed"),
HostAndPort("host-0", -1), // rollback source
HostAndPort("host-0", -1),
HostAndPort("host-1", -1),
MemberState::RS_UNKNOWN,
DataReplicatorState::Rollback,
2);
_testOplogFetcherFailed(oplogFetcherResponse,
Status(ErrorCodes::OperationFailed, "rollback failed"),
HostAndPort("host-1", -1), // rollback source
HostAndPort("host-1", -1),
HostAndPort("host-2", -1),
MemberState::RS_UNKNOWN,
DataReplicatorState::Rollback,
3);
}
TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) {
_setUpOplogFetcherFailed();
auto oplogFetcherResponse =
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch: []}}");
_testOplogFetcherFailed(oplogFetcherResponse,
Status(ErrorCodes::OperationFailed, "rollback failed"),
HostAndPort("host-0", -1), // rollback source
HostAndPort("host-0", -1),
HostAndPort("host-1", -1),
MemberState::RS_UNKNOWN,
DataReplicatorState::Rollback,
2);
_testOplogFetcherFailed(oplogFetcherResponse,
Status::OK(),
HostAndPort("host-1", -1), // rollback source
HostAndPort("host-0", -1), // blacklisted source unchanged
HostAndPort("host-1", -1),
MemberState::RS_SECONDARY,
DataReplicatorState::Steady,
2); // not used when rollback is expected to succeed
}
TEST_F(SteadyStateTest, PauseDataReplicator) {
auto operationToApply = BSON("op"
<< "a"
<< "ts" << Timestamp(Seconds(123), 0));
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
_applierFn = [&](OperationContext* txn, const BSONObj& op) {
stdx::lock_guard lock(mutex);
operationApplied = op;
barrier.countDownAndWait();
return Status::OK();
};
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
stdx::lock_guard lock(mutex);
lastTimestampApplied = opTime.getTimestamp();
barrier.countDownAndWait();
};
auto& dr = getDR();
_myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kInitialTerm);
_memberState = MemberState::RS_SECONDARY;
auto net = getNet();
net->enterNetwork();
ASSERT_OK(dr.start());
ASSERT_TRUE(net->hasReadyRequests());
{
auto networkRequest = net->getNextReadyRequest();
auto commandResponse = BSON(
"ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
<< "local.oplog.rs"
<< "firstBatch" << BSON_ARRAY(operationToApply)));
scheduleNetworkResponse(networkRequest, commandResponse);
}
dr.pause();
ASSERT_EQUALS(0U, dr.getOplogBufferCount());
// Data replication will process the fetcher response but will not schedule the applier.
net->runReadyNetworkOperations();
ASSERT_EQUALS(operationToApply["ts"].timestamp(), dr.getLastTimestampFetched());
// Schedule a bogus work item to ensure that the operation applier function
// is not scheduled.
auto& exec = getReplExecutor();
exec.scheduleWork(
[&barrier](const executor::TaskExecutor::CallbackArgs&) { barrier.countDownAndWait(); });
// Wake up executor thread and wait for bogus work callback to be invoked.
net->exitNetwork();
barrier.countDownAndWait();
// Oplog buffer should contain fetched operations since applier is not scheduled.
ASSERT_EQUALS(1U, dr.getOplogBufferCount());
dr.resume();
// Wait for applier function.
barrier.countDownAndWait();
// Run scheduleWork() work item scheduled in DataReplicator::_onApplyBatchFinish().
net->exitNetwork();
// Wait for batch completion callback.
barrier.countDownAndWait();
ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), _memberState.toString());
{
stdx::lock_guard lock(mutex);
ASSERT_EQUALS(operationToApply, operationApplied);
ASSERT_EQUALS(operationToApply["ts"].timestamp(), lastTimestampApplied);
}
}
TEST_F(SteadyStateTest, ApplyOneOperation) {
auto operationToApply = BSON("op"
<< "a"
<< "ts" << Timestamp(Seconds(123), 0));
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
_applierFn = [&](OperationContext* txn, const BSONObj& op) {
stdx::lock_guard lock(mutex);
operationApplied = op;
barrier.countDownAndWait();
return Status::OK();
};
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
stdx::lock_guard lock(mutex);
lastTimestampApplied = opTime.getTimestamp();
barrier.countDownAndWait();
};
_myLastOpTime = OpTime(operationToApply["ts"].timestamp(), OpTime::kInitialTerm);
_memberState = MemberState::RS_SECONDARY;
auto net = getNet();
net->enterNetwork();
auto& dr = getDR();
ASSERT_OK(dr.start());
ASSERT_TRUE(net->hasReadyRequests());
{
auto networkRequest = net->getNextReadyRequest();
auto commandResponse = BSON(
"ok" << 1 << "cursor" << BSON("id" << 0LL << "ns"
<< "local.oplog.rs"
<< "firstBatch" << BSON_ARRAY(operationToApply)));
scheduleNetworkResponse(networkRequest, commandResponse);
}
ASSERT_EQUALS(0U, dr.getOplogBufferCount());
// Oplog buffer should be empty because contents are transferred to applier.
net->runReadyNetworkOperations();
ASSERT_EQUALS(0U, dr.getOplogBufferCount());
// Wait for applier function.
barrier.countDownAndWait();
ASSERT_EQUALS(operationToApply["ts"].timestamp(), dr.getLastTimestampFetched());
// Run scheduleWork() work item scheduled in DataReplicator::_onApplyBatchFinish().
net->exitNetwork();
// Wait for batch completion callback.
barrier.countDownAndWait();
ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), _memberState.toString());
{
stdx::lock_guard lock(mutex);
ASSERT_EQUALS(operationToApply, operationApplied);
ASSERT_EQUALS(operationToApply["ts"].timestamp(), lastTimestampApplied);
}
// Ensure that we send position information upstream after completing batch.
net->enterNetwork();
ASSERT_TRUE(net->hasReadyRequests());
{
auto networkRequest = net->getNextReadyRequest();
auto commandRequest = networkRequest->getRequest();
ASSERT_EQUALS("admin", commandRequest.dbname);
const auto& cmdObj = commandRequest.cmdObj;
ASSERT_EQUALS(std::string("replSetUpdatePosition"), cmdObj.firstElementFieldName());
}
}
} // namespace