// replsettests.cpp : Unit tests for replica sets // /** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/pch.h" #include "mongo/db/db.h" #include "mongo/db/index_builder.h" #include "mongo/db/instance.h" #include "mongo/db/json.h" #include "mongo/db/kill_current_op.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_server_status.h" // replSettings #include "mongo/db/repl/rs.h" #include "mongo/dbtests/dbtests.h" #include "mongo/util/time_support.h" namespace mongo { void createOplog(); } namespace ReplSetTests { const int replWriterThreadCount(32); const int replPrefetcherThreadCount(32); class ReplSetTest : public ReplSet { ReplSetConfig *_config; ReplSetConfig::MemberCfg *_myConfig; replset::BackgroundSyncInterface *_syncTail; public: static const int replWriterThreadCount; static const int replPrefetcherThreadCount; static ReplSetTest* make() { auto_ptr ret(new ReplSetTest()); ret->init(); return ret.release(); } virtual ~ReplSetTest() { delete _myConfig; delete _config; } virtual bool isSecondary() { return true; } virtual bool isPrimary() { return false; } virtual bool tryToGoLiveAsASecondary(OpTime& minvalid) { return false; } virtual const ReplSetConfig& config() { return *_config; } virtual const ReplSetConfig::MemberCfg& myConfig() { return *_myConfig; } virtual bool buildIndexes() const { return true; } void setSyncTail(replset::BackgroundSyncInterface *syncTail) { _syncTail = syncTail; } private: ReplSetTest() : _syncTail(0) { } void init() { BSONArrayBuilder members; members.append(BSON("_id" << 0 << "host" << "host1")); _config = ReplSetConfig::make(BSON("_id" << "foo" << "members" << members.arr())); _myConfig = new ReplSetConfig::MemberCfg(); } }; class BackgroundSyncTest : public replset::BackgroundSyncInterface { std::queue _queue; public: BackgroundSyncTest() {} virtual ~BackgroundSyncTest() {} virtual bool peek(BSONObj* op) { if (_queue.empty()) { return false; } *op = _queue.front(); return true; } virtual void consume() { _queue.pop(); } virtual Member* getSyncTarget() { return 0; } void addDoc(BSONObj doc) { _queue.push(doc.getOwned()); } virtual void waitForMore() { return; } }; class Base { private: static DBDirectClient client_; protected: static BackgroundSyncTest* _bgsync; static replset::SyncTail* _tailer; public: Base() { } ~Base() { } static const char *ns() { return "unittests.repltests"; } DBDirectClient *client() const { return &client_; } static void insert( const BSONObj &o, bool god = false ) { Lock::DBWrite lk(ns()); Client::Context ctx(ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { coll = db->createCollection(ns()); } if (o.hasField("_id")) { coll->insertDocument(o, true); return; } class BSONObjBuilder b; OID id; id.init(); b.appendOID("_id", &id); b.appendElements(o); coll->insertDocument(b.obj(), true); } BSONObj findOne( const BSONObj &query = BSONObj() ) const { return client()->findOne( ns(), query ); } void drop() { Client::WriteContext c(ns()); Database* db = c.ctx().db(); if ( db->getCollection( ns() ) == NULL ) { return; } db->dropCollection(ns()); } static void setup() { replSettings.replSet = "foo"; replSettings.oplogSize = 5 * 1024 * 1024; createOplog(); // setup background sync instance _bgsync = new BackgroundSyncTest(); // setup tail _tailer = new replset::SyncTail(_bgsync); // setup theReplSet ReplSetTest *rst = ReplSetTest::make(); rst->setSyncTail(_bgsync); delete theReplSet; theReplSet = rst; } }; DBDirectClient Base::client_; BackgroundSyncTest* Base::_bgsync = NULL; replset::SyncTail* Base::_tailer = NULL; class MockInitialSync : public replset::InitialSync { int step; public: MockInitialSync() : InitialSync(0), step(0), failOnStep(SUCCEED), retry(true) {} enum FailOn {SUCCEED, FAIL_FIRST_APPLY, FAIL_BOTH_APPLY}; FailOn failOnStep; bool retry; // instead of actually applying operations, we return success or failure virtual bool syncApply(const BSONObj& o, bool convertUpdateToUpsert) { step++; if ((failOnStep == FAIL_FIRST_APPLY && step == 1) || (failOnStep == FAIL_BOTH_APPLY)) { return false; } return true; } virtual bool shouldRetry(const BSONObj& o) { return retry; } }; class TestInitApplyOp : public Base { public: void run() { OpTime o; { mongo::mutex::scoped_lock lk2(OpTime::m); o = OpTime::now(lk2); } BSONObjBuilder b; b.append("ns","dummy"); b.appendTimestamp("ts", o.asLL()); BSONObj obj = b.obj(); MockInitialSync mock; // all three should succeed std::vector ops; ops.push_back(obj); replset::multiInitialSyncApply(ops, &mock); mock.failOnStep = MockInitialSync::FAIL_FIRST_APPLY; replset::multiInitialSyncApply(ops, &mock); mock.retry = false; replset::multiInitialSyncApply(ops, &mock); drop(); } }; class SyncTest2 : public replset::InitialSync { public: bool insertOnRetry; SyncTest2() : InitialSync(0), insertOnRetry(false) {} virtual ~SyncTest2() {} virtual bool shouldRetry(const BSONObj& o) { if (!insertOnRetry) { return true; } Base::insert(BSON("_id" << 123)); return true; } }; class TestInitApplyOp2 : public Base { public: void run() { OpTime o = OpTime::_now(); BSONObjBuilder b; b.appendTimestamp("ts", o.asLL()); b.append("op", "u"); b.append("o", BSON("$set" << BSON("x" << 456))); b.append("o2", BSON("_id" << 123)); b.append("ns", ns()); BSONObj obj = b.obj(); SyncTest2 sync2; std::vector ops; ops.push_back(obj); sync2.insertOnRetry = true; // succeeds multiInitialSyncApply(ops, &sync2); BSONObj fin = findOne(); verify(fin["x"].Number() == 456); drop(); } }; class CappedInitialSync : public Base { string _cappedNs; Lock::DBWrite _lk; string spec() const { return "{\"capped\":true,\"size\":512}"; } void create() { Client::Context c(_cappedNs); ASSERT( userCreateNS( c.db(), _cappedNs, fromjson( spec() ), false ).isOK() ); } void dropCapped() { Client::Context c(_cappedNs); Database* db = c.db(); if ( db->getCollection( _cappedNs ) ) { db->dropCollection( _cappedNs ); } } BSONObj updateFail() { BSONObjBuilder b; { mongo::mutex::scoped_lock lk2(OpTime::m); b.appendTimestamp("ts", OpTime::now(lk2).asLL()); } b.append("op", "u"); b.append("o", BSON("$set" << BSON("x" << 456))); b.append("o2", BSON("_id" << 123 << "x" << 123)); b.append("ns", _cappedNs); BSONObj o = b.obj(); verify(!apply(o)); return o; } public: CappedInitialSync() : _cappedNs("unittests.foo.bar"), _lk(_cappedNs) { dropCapped(); create(); } virtual ~CappedInitialSync() { dropCapped(); } string& cappedNs() { return _cappedNs; } // returns true on success, false on failure bool apply(const BSONObj& op) { Client::Context ctx( _cappedNs ); // in an annoying twist of api, returns true on failure return !applyOperation_inlock(ctx.db(), op, true); } void run() { Lock::DBWrite lk(_cappedNs); BSONObj op = updateFail(); Sync s(""); verify(!s.shouldRetry(op)); } }; class CappedUpdate : public CappedInitialSync { void updateSucceed() { BSONObjBuilder b; { mongo::mutex::scoped_lock lk2(OpTime::m); b.appendTimestamp("ts", OpTime::now(lk2).asLL()); } b.append("op", "u"); b.append("o", BSON("$set" << BSON("x" << 789))); b.append("o2", BSON("x" << 456)); b.append("ns", cappedNs()); verify(apply(b.obj())); } void insert() { Client::Context ctx(cappedNs()); Database* db = ctx.db(); Collection* coll = db->getCollection(cappedNs()); if (!coll) { coll = db->createCollection(cappedNs()); } BSONObj o = BSON(GENOID << "x" << 456); DiskLoc loc = coll->insertDocument(o, true).getValue(); verify(!loc.isNull()); } public: virtual ~CappedUpdate() {} void run() { // RARELY shoud be once/128x for (int i=0; i<150; i++) { insert(); updateSucceed(); } DBDirectClient client; int count = (int) client.count(cappedNs(), BSONObj()); verify(count > 1); // check _id index created Client::Context ctx(cappedNs()); Collection* collection = ctx.db()->getCollection( cappedNs() ); verify(collection->getIndexCatalog()->findIdIndex()); } }; class CappedInsert : public CappedInitialSync { void insertSucceed() { BSONObjBuilder b; { mongo::mutex::scoped_lock lk2(OpTime::m); b.appendTimestamp("ts", OpTime::now(lk2).asLL()); } b.append("op", "i"); b.append("o", BSON("_id" << 123 << "x" << 456)); b.append("ns", cappedNs()); verify(apply(b.obj())); } public: virtual ~CappedInsert() {} void run() { // This will succeed, but not insert anything because they are changed to upserts for (int i=0; i<150; i++) { insertSucceed(); } // this changed in 2.1.2 // we now have indexes on capped collections Client::Context ctx(cappedNs()); Collection* collection = ctx.db()->getCollection( cappedNs() ); verify(collection->getIndexCatalog()->findIdIndex()); } }; class TestRSSync : public Base { void addOp(const string& op, BSONObj o, BSONObj* o2 = NULL, const char* coll = NULL, int version = 0) { OpTime ts; { Lock::GlobalWrite lk; ts = OpTime::_now(); } BSONObjBuilder b; b.appendTimestamp("ts", ts.asLL()); if (version != 0) { b.append("v", version); } b.append("op", op); b.append("o", o); if (o2) { b.append("o2", *o2); } if (coll) { b.append("ns", coll); } else { b.append("ns", ns()); } _bgsync->addDoc(b.done()); } void addInserts(int expected) { for (int i=0; ioplogApplication(); } public: void run() { const int expected = 100; drop(); addInserts(100); applyOplog(); ASSERT_EQUALS(expected, static_cast(client()->count(ns()))); drop(); addVersionedInserts(100); applyOplog(); ASSERT_EQUALS(expected, static_cast(client()->count(ns()))); drop(); addUpdates(); applyOplog(); BSONObj obj = findOne(); ASSERT_EQUALS(1334813340, obj["requests"]["1000001_2"]["timestamp"].number()); ASSERT_EQUALS(1334813368, obj["requests"]["1000002_2"]["timestamp"].number()); ASSERT_EQUALS(1334810820, obj["requests"]["100002_1"]["timestamp"].number()); drop(); // test converting updates to upserts but only for version 2.2.1 and greater, // which means oplog version 2 and greater. addConflictingUpdates(); applyOplog(); drop(); } }; class All : public Suite { public: All() : Suite( "replset" ) { } void setupTests() { Base::setup(); add< TestInitApplyOp >(); add< TestInitApplyOp2 >(); add< CappedInitialSync >(); add< CappedUpdate >(); add< CappedInsert >(); add< TestRSSync >(); } } myall; }