/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault #include "mongo/platform/basic.h" #include #include "mongo/client/fetcher.h" #include "mongo/db/json.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/mutex.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/log.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" namespace { using namespace mongo; using namespace mongo::repl; using executor::NetworkInterfaceMock; using LockGuard = stdx::lock_guard; using UniqueLock = stdx::unique_lock; using mutex = stdx::mutex; ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) { ReplicaSetConfig config; ASSERT_OK(config.initialize(configBson)); ASSERT_OK(config.validate()); return config; } const HostAndPort target("localhost", -1); class DataReplicatorTest : public ReplicationExecutorTest { public: DataReplicatorTest() {} void postExecutorThreadLaunch() override{}; void reset() { // clear/reset state } void createDataReplicator(DataReplicatorOptions opts) { _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get())); _dr->__setSourceForTesting(target); } void createDataReplicator(DataReplicatorOptions opts, DataReplicator::OnBatchCompleteFn batchCompletedFn) { _dr.reset(new DataReplicator(opts, &(getExecutor()), _repl.get(), batchCompletedFn)); _dr->__setSourceForTesting(target); } void scheduleNetworkResponse(const BSONObj& obj) { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); scheduleNetworkResponse(net->getNextReadyRequest(), obj); } void scheduleNetworkResponse(NetworkInterfaceMock::NetworkOperationIterator noi, const BSONObj& obj) { NetworkInterfaceMock* net = getNet(); Milliseconds millis(0); RemoteCommandResponse response(obj, millis); ReplicationExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(noi, net->now(), responseStatus); } void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); ReplicationExecutor::ResponseStatus responseStatus(code, reason); net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } void processNetworkResponse(const BSONObj& obj) { scheduleNetworkResponse(obj); finishProcessingNetworkResponse(); } void processNetworkResponse(ErrorCodes::Error code, const std::string& reason) { scheduleNetworkResponse(code, reason); finishProcessingNetworkResponse(); } void finishProcessingNetworkResponse() { getNet()->runReadyNetworkOperations(); ASSERT_FALSE(getNet()->hasReadyRequests()); } DataReplicator& getDR() { return *_dr; } ReplicationCoordinator& getRepl() { return *_repl; } protected: void setUp() override { ReplicationExecutorTest::setUp(); reset(); _settings.replSet = "foo"; // We are a replica set :) _repl.reset(new ReplicationCoordinatorMock(_settings)); launchExecutorThread(); DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); createDataReplicator(options); } void tearDown() override { ReplicationExecutorTest::tearDown(); _dr.reset(); _repl.reset(); // Executor may still invoke callback before shutting down. } private: std::unique_ptr _dr; std::unique_ptr _repl; ReplSettings _settings; }; TEST_F(DataReplicatorTest, CreateDestroy) {} TEST_F(DataReplicatorTest, StartOk) { ASSERT_EQ(getDR().start().code(), ErrorCodes::OK); } TEST_F(DataReplicatorTest, CannotInitialSyncAfterStart) { ASSERT_EQ(getDR().start().code(), ErrorCodes::OK); ASSERT_EQ(getDR().initialSync(), ErrorCodes::AlreadyInitialized); } // Used to run a Initial Sync in a separate thread, to avoid blocking test execution. class InitialSyncBackgroundRunner { public: InitialSyncBackgroundRunner(DataReplicator* dr) : _dr(dr), _result(Status(ErrorCodes::BadValue, "failed to set status")) {} // Could block if _sgr has not finished TimestampStatus getResult() { _thread->join(); return _result; } void run() { _thread.reset(new stdx::thread(stdx::bind(&InitialSyncBackgroundRunner::_run, this))); sleepmillis(2); // sleep to let new thread run initialSync so it schedules work } private: void _run() { setThreadName("InitialSyncRunner"); log() << "starting initial sync"; _result = _dr->initialSync(); // blocking } DataReplicator* _dr; TimestampStatus _result; std::unique_ptr _thread; }; class InitialSyncTest : public DataReplicatorTest { public: InitialSyncTest() : _insertCollectionFn([&](OperationContext* txn, const NamespaceString& theNss, const std::vector& theDocuments) { log() << "insertDoc for " << theNss.toString(); LockGuard lk(_collectionCountMutex); ++(_collectionCounts[theNss.toString()]); return Status::OK(); }), _beginCollectionFn([&](OperationContext* txn, const NamespaceString& theNss, const CollectionOptions& theOptions, const std::vector& theIndexSpecs) { log() << "beginCollection for " << theNss.toString(); LockGuard lk(_collectionCountMutex); _collectionCounts[theNss.toString()] = 0; return Status::OK(); }){}; protected: void setStorageFuncs(ClonerStorageInterfaceMock::InsertCollectionFn ins, ClonerStorageInterfaceMock::BeginCollectionFn beg) { _insertCollectionFn = ins; _beginCollectionFn = beg; } void setResponses(std::vector resps) { _responses = resps; } void startSync() { DataReplicator* dr = &(getDR()); _storage.beginCollectionFn = _beginCollectionFn; _storage.insertDocumentsFn = _insertCollectionFn; _storage.insertMissingDocFn = [&](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) { return Status::OK(); }; dr->_setInitialSyncStorageInterface(&_storage); _isbr.reset(new InitialSyncBackgroundRunner(dr)); _isbr->run(); } void playResponses() { // TODO: Handle network responses NetworkInterfaceMock* net = getNet(); int processedRequests(0); const int expectedResponses(_responses.size()); // counter for oplog entries int c(0); while (true) { net->enterNetwork(); if (!net->hasReadyRequests() && processedRequests < expectedResponses) { net->exitNetwork(); continue; } NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const BSONObj reqBSON = noi->getRequest().cmdObj; const BSONElement cmdElem = reqBSON.firstElement(); const bool isGetMore = cmdElem.fieldNameStringData().equalCaseInsensitive("getmore"); const long long cursorId = cmdElem.numberLong(); if (isGetMore && cursorId == 1LL) { // process getmore requests from the oplog fetcher auto respBSON = fromjson(str::stream() << "{ok:1, cursor:{id:1, ns:'local.oplog.rs', nextBatch:[" "{ts:Timestamp(" << ++c << ",1), h:1, ns:'test.a', v:2, op:'u', o2:{_id:" << c << "}, o:{$set:{a:1}}}" "]}}"); net->scheduleResponse( noi, net->now(), ResponseStatus(RemoteCommandResponse(respBSON, Milliseconds(10)))); net->runReadyNetworkOperations(); net->exitNetwork(); continue; } else if (isGetMore) { // TODO: return more data } // process fixed set of responses log() << "processing network request: " << noi->getRequest().dbname << "." << noi->getRequest().cmdObj.toString(); net->scheduleResponse(noi, net->now(), ResponseStatus(RemoteCommandResponse( _responses[processedRequests], Milliseconds(10)))); net->runReadyNetworkOperations(); net->exitNetwork(); if (++processedRequests >= expectedResponses) { log() << "done processing expected requests "; break; // once we have processed all requests, continue; } } net->enterNetwork(); if (net->hasReadyRequests()) { log() << "There are unexpected requests left"; log() << "next cmd: " << net->getNextReadyRequest()->getRequest().cmdObj.toString(); ASSERT_FALSE(net->hasReadyRequests()); } net->exitNetwork(); } void verifySync(Status s = Status::OK()) { verifySync(_isbr->getResult().getStatus().code()); } void verifySync(ErrorCodes::Error code) { // Check result ASSERT_EQ(_isbr->getResult().getStatus().code(), code) << "status codes differ"; } std::map getLocalCollectionCounts() { return _collectionCounts; } private: ClonerStorageInterfaceMock::InsertCollectionFn _insertCollectionFn; ClonerStorageInterfaceMock::BeginCollectionFn _beginCollectionFn; std::vector _responses; std::unique_ptr _isbr; std::map _collectionCounts; // counts of inserts during cloning mutex _collectionCountMutex; // used to protect the collectionCount map ClonerStorageInterfaceMock _storage; }; TEST_F(InitialSyncTest, Complete) { /** * Initial Sync will issue these query/commands * - startTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"] * - listDatabases (foreach db do below) * -- cloneDatabase (see DatabaseCloner tests). * - endTS = oplog.rs->find().sort({$natural:-1}).limit(-1).next()["ts"] * - ops = oplog.rs->find({ts:{$gte: startTS}}) (foreach op) * -- if local doc is missing, getCollection(op.ns).findOne(_id:op.o2._id) * - if any retries were done in the previous loop, endTS query again for minvalid * */ const std::vector responses = { // get latest oplog ts fromjson( "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" "]}}"), // oplog fetcher find fromjson( "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" "]}}"), // Clone Start // listDatabases fromjson("{ok:1, databases:[{name:'a'}]}"), // listCollections for "a" fromjson( "{ok:1, cursor:{id:0, ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}"), // listIndexes:a fromjson( "{ok:1, cursor:{id:0, ns:'a.$cmd.listIndexes.a', firstBatch:[" "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}" "]}}"), // find:a fromjson( "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[" "{_id:1, a:1} " "]}}"), // Clone Done // get latest oplog ts fromjson( "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}" "]}}"), // Applier starts ... }; startSync(); setResponses(responses); playResponses(); verifySync(); } TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; opts.applierFn = [&](OperationContext* txn, const BSONObj& op) { if (++applyCounter == 1) { return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); } return Status::OK(); }; createDataReplicator(opts); const std::vector responses = { // get latest oplog ts fromjson( "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" "]}}"), // oplog fetcher find fromjson( "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'u', o2:{_id:1}, o:{$set:{a:1}}}" "]}}"), // Clone Start // listDatabases fromjson("{ok:1, databases:[{name:'a'}]}"), // listCollections for "a" fromjson( "{ok:1, cursor:{id:0, ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}"), // listIndexes:a fromjson( "{ok:1, cursor:{id:0, ns:'a.$cmd.listIndexes.a', firstBatch:[" "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}" "]}}"), // find:a -- empty fromjson("{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[]}}"), // Clone Done // get latest oplog ts fromjson( "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}" "]}}"), // Applier starts ... // missing doc fetch -- find:a {_id:1} fromjson( "{ok:1, cursor:{id:0, ns:'a.a', firstBatch:[" "{_id:1, a:1} " "]}}"), }; startSync(); setResponses(responses); playResponses(); verifySync(ErrorCodes::OK); } TEST_F(InitialSyncTest, Failpoint) { mongo::getGlobalFailPointRegistry() ->getFailPoint("failInitialSyncWithBadHost") ->setMode(FailPoint::alwaysOn); BSONObj configObj = BSON("_id" << "mySet" << "version" << 1 << "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") << BSON("_id" << 2 << "host" << "node2:12345") << BSON("_id" << 3 << "host" << "node3:12345"))); ReplicaSetConfig config = assertMakeRSConfig(configObj); Timestamp time1(100, 1); OpTime opTime1(time1, OpTime::kDefaultTerm); getRepl().setMyLastOptime(opTime1); ASSERT(getRepl().setFollowerMode(MemberState::RS_SECONDARY)); DataReplicator* dr = &(getDR()); InitialSyncBackgroundRunner isbr(dr); isbr.run(); ASSERT_EQ(isbr.getResult().getStatus().code(), ErrorCodes::InitialSyncFailure); mongo::getGlobalFailPointRegistry() ->getFailPoint("failInitialSyncWithBadHost") ->setMode(FailPoint::off); } TEST_F(InitialSyncTest, FailsOnClone) { const std::vector responses = { // get latest oplog ts fromjson( "{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" "]}}"), // oplog fetcher find fromjson( "{ok:1, cursor:{id:1, ns:'local.oplog.rs', firstBatch:[" "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" "]}}"), // Clone Start // listDatabases fromjson("{ok:0}")}; startSync(); setResponses(responses); playResponses(); verifySync(ErrorCodes::InitialSyncFailure); } class SteadyStateTest : public DataReplicatorTest { protected: void _testOplogStartMissing(const BSONObj& oplogFetcherResponse) { DataReplicator& dr = getDR(); auto net = getNet(); net->enterNetwork(); ASSERT_OK(dr.start()); ASSERT_TRUE(net->hasReadyRequests()); auto noi = net->getNextReadyRequest(); scheduleNetworkResponse(noi, oplogFetcherResponse); net->runReadyNetworkOperations(); ASSERT_EQUALS(MemberState(MemberState::RS_RECOVERING).toString(), getRepl().getMemberState().toString()); } }; TEST_F(SteadyStateTest, StartWhenInSteadyState) { DataReplicator& dr = getDR(); ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState())); ASSERT_OK(dr.start()); ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState())); ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr.start().code()); } TEST_F(SteadyStateTest, ShutdownAfterStart) { DataReplicator& dr = getDR(); ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState())); auto net = getNet(); net->enterNetwork(); ASSERT_OK(dr.start()); ASSERT_TRUE(net->hasReadyRequests()); getExecutor().shutdown(); ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState())); ASSERT_EQUALS(ErrorCodes::IllegalOperation, dr.start().code()); } TEST_F(SteadyStateTest, RequestShutdownAfterStart) { DataReplicator& dr = getDR(); ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState())); auto net = getNet(); net->enterNetwork(); ASSERT_OK(dr.start()); ASSERT_TRUE(net->hasReadyRequests()); ASSERT_EQUALS(toString(DataReplicatorState::Steady), toString(dr.getState())); // Simulating an invalid remote oplog query response. This will invalidate the existing // sync source but that's fine because we're not testing oplog processing. scheduleNetworkResponse(BSON("ok" << 0)); net->runReadyNetworkOperations(); ASSERT_OK(dr.scheduleShutdown()); net->exitNetwork(); // runs work item scheduled in 'scheduleShutdown()). dr.waitForShutdown(); ASSERT_EQUALS(toString(DataReplicatorState::Uninitialized), toString(dr.getState())); } TEST_F(SteadyStateTest, RemoteOplogEmpty) { _testOplogStartMissing(fromjson("{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch: []}}")); } TEST_F(SteadyStateTest, RemoteOplogFirstOperationMissingTimestamp) { _testOplogStartMissing( fromjson("{ok:1, cursor:{id:0, ns:'local.oplog.rs', firstBatch: [{}]}}")); } TEST_F(SteadyStateTest, RemoteOplogFirstOperationTimestampDoesNotMatch) { _testOplogStartMissing(fromjson( "{ok:1, cursor:{id:0, ns:'local.oplog.rs', " "firstBatch: [{ts:Timestamp(1,1)}]}}")); } TEST_F(SteadyStateTest, ApplyOneOperation) { auto operationToApply = BSON("op" << "a" << "ts" << Timestamp(Seconds(123), 0)); stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; auto batchCompletedFn = [&](const Timestamp& ts) { stdx::lock_guard lock(mutex); lastTimestampApplied = ts; barrier.countDownAndWait(); }; DataReplicatorOptions opts; opts.applierFn = [&](OperationContext* txn, const BSONObj& op) { stdx::lock_guard lock(mutex); operationApplied = op; barrier.countDownAndWait(); return Status::OK(); }; createDataReplicator(opts, batchCompletedFn); auto& repl = getRepl(); repl.setMyLastOptime(OpTime(operationToApply["ts"].timestamp(), 0)); ASSERT_TRUE(repl.setFollowerMode(MemberState::RS_SECONDARY)); auto net = getNet(); net->enterNetwork(); auto& dr = getDR(); ASSERT_OK(dr.start()); ASSERT_TRUE(net->hasReadyRequests()); { auto networkRequest = net->getNextReadyRequest(); auto commandResponse = BSON( "ok" << 1 << "cursor" << BSON("id" << 0LL << "ns" << "local.oplog.rs" << "firstBatch" << BSON_ARRAY(operationToApply))); scheduleNetworkResponse(networkRequest, commandResponse); } net->runReadyNetworkOperations(); // Wait for applier function. barrier.countDownAndWait(); ASSERT_EQUALS(operationToApply["ts"].timestamp(), dr.getLastTimestampFetched()); // Run scheduleWork() work item scheduled in DataReplicator::_onApplyBatchFinish(). net->exitNetwork(); // Wait for batch completion callback. barrier.countDownAndWait(); ASSERT_EQUALS(MemberState(MemberState::RS_SECONDARY).toString(), repl.getMemberState().toString()); { stdx::lock_guard lock(mutex); ASSERT_EQUALS(operationToApply, operationApplied); ASSERT_EQUALS(operationToApply["ts"].timestamp(), lastTimestampApplied); } // Ensure that we send position information upstream after completing batch. net->enterNetwork(); ASSERT_TRUE(net->hasReadyRequests()); { auto networkRequest = net->getNextReadyRequest(); auto commandRequest = networkRequest->getRequest(); ASSERT_EQUALS("admin", commandRequest.dbname); const auto& cmdObj = commandRequest.cmdObj; ASSERT_EQUALS(std::string("replSetUpdatePosition"), cmdObj.firstElementFieldName()); } } } // namespace