summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/connections.h128
-rw-r--r--src/mongo/db/repl/consensus.cpp449
-rw-r--r--src/mongo/db/repl/health.cpp449
-rw-r--r--src/mongo/db/repl/health.h50
-rw-r--r--src/mongo/db/repl/heartbeat.cpp382
-rw-r--r--src/mongo/db/repl/manager.cpp274
-rw-r--r--src/mongo/db/repl/multicmd.h75
-rw-r--r--src/mongo/db/repl/replset_commands.cpp404
-rw-r--r--src/mongo/db/repl/rs.cpp778
-rw-r--r--src/mongo/db/repl/rs.h667
-rw-r--r--src/mongo/db/repl/rs_config.cpp662
-rw-r--r--src/mongo/db/repl/rs_config.h251
-rw-r--r--src/mongo/db/repl/rs_exception.h17
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp271
-rw-r--r--src/mongo/db/repl/rs_initiate.cpp269
-rw-r--r--src/mongo/db/repl/rs_member.h131
-rw-r--r--src/mongo/db/repl/rs_optime.h58
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp667
-rw-r--r--src/mongo/db/repl/rs_sync.cpp701
-rw-r--r--src/mongo/db/repl/test.html11
-rw-r--r--src/mongo/db/repl/testing.js42
21 files changed, 6736 insertions, 0 deletions
diff --git a/src/mongo/db/repl/connections.h b/src/mongo/db/repl/connections.h
new file mode 100644
index 00000000000..3e08f80b047
--- /dev/null
+++ b/src/mongo/db/repl/connections.h
@@ -0,0 +1,128 @@
+// @file
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <map>
+#include "../../client/dbclient.h"
+#include "../security_common.h"
+
+namespace mongo {
+
+ /** here we keep a single connection (with reconnect) for a set of hosts,
+ one each, and allow one user at a time per host. if in use already for that
+ host, we block. so this is an easy way to keep a 1-deep pool of connections
+ that many threads can share.
+
+ thread-safe.
+
+ Example:
+ {
+ ScopedConn c("foo.acme.com:9999");
+ c->runCommand(...);
+ }
+
+ throws exception on connect error (but fine to try again later with a new
+ scopedconn object for same host).
+ */
+ class ScopedConn {
+ public:
+ /** throws assertions if connect failure etc. */
+ ScopedConn(string hostport);
+ ~ScopedConn() {
+ // conLock releases...
+ }
+ void reconnect() {
+ conn()->port().shutdown();
+ connect();
+ }
+
+ /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic.
+ So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes
+ ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed.
+ */
+ bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) {
+ return conn()->runCommand(dbname, cmd, info, options);
+ }
+ unsigned long long count(const string &ns) {
+ return conn()->count(ns);
+ }
+ BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
+ return conn()->findOne(ns, q, fieldsToReturn, queryOptions);
+ }
+
+ private:
+ auto_ptr<scoped_lock> connLock;
+ static mongo::mutex mapMutex;
+ struct X {
+ mongo::mutex z;
+ DBClientConnection cc;
+ bool connected;
+ X() : z("X"), cc(/*reconnect*/ true, 0, /*timeout*/ 10.0), connected(false) {
+ cc._logLevel = 2;
+ }
+ } *x;
+ typedef map<string,ScopedConn::X*> M;
+ static M& _map;
+ DBClientConnection* conn() { return &x->cc; }
+ const string _hostport;
+
+ // we should already be locked...
+ bool connect() {
+ string err;
+ if (!x->cc.connect(_hostport, err)) {
+ log() << "couldn't connect to " << _hostport << ": " << err << rsLog;
+ return false;
+ }
+ x->connected = true;
+
+ // if we cannot authenticate against a member, then either its key file
+ // or our key file has to change. if our key file has to change, we'll
+ // be rebooting. if their file has to change, they'll be rebooted so the
+ // connection created above will go dead, reconnect, and reauth.
+ if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) {
+ log() << "could not authenticate against " << _hostport << ", " << err << rsLog;
+ return false;
+ }
+
+ return true;
+ }
+ };
+
+ inline ScopedConn::ScopedConn(string hostport) : _hostport(hostport) {
+ bool first = false;
+ {
+ scoped_lock lk(mapMutex);
+ x = _map[_hostport];
+ if( x == 0 ) {
+ x = _map[_hostport] = new X();
+ first = true;
+ connLock.reset( new scoped_lock(x->z) );
+ }
+ }
+
+ // Keep trying to connect if we're not yet connected
+ if( !first && x->connected ) {
+ connLock.reset( new scoped_lock(x->z) );
+ return;
+ }
+
+ connect();
+ }
+
+}
diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp
new file mode 100644
index 00000000000..3995373f5ef
--- /dev/null
+++ b/src/mongo/db/repl/consensus.cpp
@@ -0,0 +1,449 @@
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../commands.h"
+#include "rs.h"
+#include "multicmd.h"
+
+namespace mongo {
+
+ class CmdReplSetFresh : public ReplSetCommand {
+ public:
+ CmdReplSetFresh() : ReplSetCommand("replSetFresh") { }
+ private:
+
+ bool shouldVeto(const BSONObj& cmdObj, string& errmsg) {
+ unsigned id = cmdObj["id"].Int();
+ const Member* primary = theReplSet->box.getPrimary();
+ const Member* hopeful = theReplSet->findById(id);
+ const Member *highestPriority = theReplSet->getMostElectable();
+
+ if( !hopeful ) {
+ errmsg = str::stream() << "replSet couldn't find member with id " << id;
+ return true;
+ }
+ else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
+ // hbinfo is not updated, so we have to check the primary's last optime separately
+ errmsg = str::stream() << "I am already primary, " << hopeful->fullName() <<
+ " can try again once I've stepped down";
+ return true;
+ }
+ else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
+ // other members might be aware of more up-to-date nodes
+ errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " <<
+ primary->fullName() << " is already primary and more up-to-date";
+ return true;
+ }
+ else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
+ errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
+ return true;
+ }
+
+ // don't veto older versions
+ if (cmdObj["id"].eoo()) {
+ // they won't be looking for the veto field
+ return false;
+ }
+
+ if ( !theReplSet->isElectable(id) ||
+ (highestPriority && highestPriority->config().priority > hopeful->config().priority)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+
+ if( cmdObj["set"].String() != theReplSet->name() ) {
+ errmsg = "wrong repl set name";
+ return false;
+ }
+ string who = cmdObj["who"].String();
+ int cfgver = cmdObj["cfgver"].Int();
+ OpTime opTime(cmdObj["opTime"].Date());
+
+ bool weAreFresher = false;
+ if( theReplSet->config().version > cfgver ) {
+ log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog;
+ result.append("info", "config version stale");
+ weAreFresher = true;
+ }
+ // check not only our own optime, but any other member we can reach
+ else if( opTime < theReplSet->lastOpTimeWritten ||
+ opTime < theReplSet->lastOtherOpTime()) {
+ weAreFresher = true;
+ }
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ result.append("fresher", weAreFresher);
+ result.append("veto", shouldVeto(cmdObj, errmsg));
+
+ return true;
+ }
+ } cmdReplSetFresh;
+
+ class CmdReplSetElect : public ReplSetCommand {
+ public:
+ CmdReplSetElect() : ReplSetCommand("replSetElect") { }
+ private:
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ theReplSet->elect.electCmdReceived(cmdObj, &result);
+ return true;
+ }
+ } cmdReplSetElect;
+
+ int Consensus::totalVotes() const {
+ static int complain = 0;
+ int vTot = rs._self->config().votes;
+ for( Member *m = rs.head(); m; m=m->next() )
+ vTot += m->config().votes;
+ if( vTot % 2 == 0 && vTot && complain++ == 0 )
+ log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog;
+ return vTot;
+ }
+
+ bool Consensus::aMajoritySeemsToBeUp() const {
+ int vUp = rs._self->config().votes;
+ for( Member *m = rs.head(); m; m=m->next() )
+ vUp += m->hbinfo().up() ? m->config().votes : 0;
+ return vUp * 2 > totalVotes();
+ }
+
+ bool Consensus::shouldRelinquish() const {
+ int vUp = rs._self->config().votes;
+ const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries;
+ for( Member *m = rs.head(); m; m=m->next() ) {
+ long long dt = m->hbinfo().timeDown();
+ if( dt < T )
+ vUp += m->config().votes;
+ }
+
+ // the manager will handle calling stepdown if another node should be
+ // primary due to priority
+
+ return !( vUp * 2 > totalVotes() );
+ }
+
+ static const int VETO = -10000;
+
+ const time_t LeaseTime = 30;
+
+ SimpleMutex Consensus::lyMutex("ly");
+
+ unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */
+ SimpleMutex::scoped_lock lk(lyMutex);
+ LastYea &L = this->ly.ref(lk);
+ time_t now = time(0);
+ if( L.when + LeaseTime >= now && L.who != memberId ) {
+ LOG(1) << "replSet not voting yea for " << memberId <<
+ " voted for " << L.who << ' ' << now-L.when << " secs ago" << rsLog;
+ throw VoteException();
+ }
+ L.when = now;
+ L.who = memberId;
+ return rs._self->config().votes;
+ }
+
+ /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in
+ place instead of leaving it for a long time.
+ */
+ void Consensus::electionFailed(unsigned meid) {
+ SimpleMutex::scoped_lock lk(lyMutex);
+ LastYea &L = ly.ref(lk);
+ DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test
+ if( L.who == meid )
+ L.when = 0;
+ }
+
+ /* todo: threading **************** !!!!!!!!!!!!!!!! */
+ void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
+ BSONObjBuilder& b = *_b;
+ DEV log() << "replSet received elect msg " << cmd.toString() << rsLog;
+ else LOG(2) << "replSet received elect msg " << cmd.toString() << rsLog;
+ string set = cmd["set"].String();
+ unsigned whoid = cmd["whoid"].Int();
+ int cfgver = cmd["cfgver"].Int();
+ OID round = cmd["round"].OID();
+ int myver = rs.config().version;
+
+ const Member* primary = rs.box.getPrimary();
+ const Member* hopeful = rs.findById(whoid);
+ const Member* highestPriority = rs.getMostElectable();
+
+ int vote = 0;
+ if( set != rs.name() ) {
+ log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
+ }
+ else if( myver < cfgver ) {
+ // we are stale. don't vote
+ }
+ else if( myver > cfgver ) {
+ // they are stale!
+ log() << "replSet electCmdReceived info got stale version # during election" << rsLog;
+ vote = -10000;
+ }
+ else if( !hopeful ) {
+ log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog;
+ vote = -10000;
+ }
+ else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
+ // hbinfo is not updated, so we have to check the primary's last optime separately
+ log() << "I am already primary, " << hopeful->fullName()
+ << " can try again once I've stepped down" << rsLog;
+ vote = -10000;
+ }
+ else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
+ // other members might be aware of more up-to-date nodes
+ log() << hopeful->fullName() << " is trying to elect itself but " <<
+ primary->fullName() << " is already primary and more up-to-date" << rsLog;
+ vote = -10000;
+ }
+ else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
+ log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
+ vote = -10000;
+ }
+ else {
+ try {
+ vote = yea(whoid);
+ dassert( hopeful->id() == whoid );
+ rs.relinquish();
+ log() << "replSet info voting yea for " << hopeful->fullName() << " (" << whoid << ')' << rsLog;
+ }
+ catch(VoteException&) {
+ log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog;
+ }
+ }
+
+ b.append("vote", vote);
+ b.append("round", round);
+ }
+
+ void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) {
+ configVersion = config().version;
+ for( Member *m = head(); m; m=m->next() )
+ if( m->hbinfo().maybeUp() )
+ L.push_back( Target(m->fullName()) );
+ }
+
+ /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need
+ to check later that the config didn't change. */
+ void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) {
+ if( lockedByMe() ) {
+ _getTargets(L, configVersion);
+ return;
+ }
+ lock lk(this);
+ _getTargets(L, configVersion);
+ }
+
+ /* Do we have the newest data of them all?
+ @param allUp - set to true if all members are up. Only set if true returned.
+ @return true if we are freshest. Note we may tie.
+ */
+ bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
+ const OpTime ord = theReplSet->lastOpTimeWritten;
+ nTies = 0;
+ assert( !ord.isNull() );
+ BSONObj cmd = BSON(
+ "replSetFresh" << 1 <<
+ "set" << rs.name() <<
+ "opTime" << Date_t(ord.asDate()) <<
+ "who" << rs._self->fullName() <<
+ "cfgver" << rs._cfg->version <<
+ "id" << rs._self->id());
+ list<Target> L;
+ int ver;
+ /* the following queries arbiters, even though they are never fresh. wonder if that makes sense.
+ it doesn't, but it could, if they "know" what freshness it one day. so consider removing
+ arbiters from getTargets() here. although getTargets is used elsewhere for elections; there
+ arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
+ not fetching them herein happen.
+ */
+ rs.getTargets(L, ver);
+ multiCommand(cmd, L);
+ int nok = 0;
+ allUp = true;
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ if( i->ok ) {
+ nok++;
+ if( i->result["fresher"].trueValue() ) {
+ log() << "not electing self, we are not freshest" << rsLog;
+ return false;
+ }
+ OpTime remoteOrd( i->result["opTime"].Date() );
+ if( remoteOrd == ord )
+ nTies++;
+ assert( remoteOrd <= ord );
+
+ if( i->result["veto"].trueValue() ) {
+ BSONElement msg = i->result["errmsg"];
+ if (!msg.eoo()) {
+ log() << "not electing self, " << i->toHost << " would veto with '" <<
+ msg.String() << "'" << rsLog;
+ }
+ else {
+ log() << "not electing self, " << i->toHost << " would veto" << rsLog;
+ }
+ return false;
+ }
+ }
+ else {
+ DEV log() << "replSet freshest returns " << i->result.toString() << rsLog;
+ allUp = false;
+ }
+ }
+ LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
+ assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
+ return true;
+ }
+
+ extern time_t started;
+
+ void Consensus::multiCommand(BSONObj cmd, list<Target>& L) {
+ assert( !rs.lockedByMe() );
+ mongo::multiCommand(cmd, L);
+ }
+
+ void Consensus::_electSelf() {
+ if( time(0) < steppedDown )
+ return;
+
+ {
+ const OpTime ord = theReplSet->lastOpTimeWritten;
+ if( ord == 0 ) {
+ log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog;
+ return;
+ }
+ }
+
+ bool allUp;
+ int nTies;
+ if( !weAreFreshest(allUp, nTies) ) {
+ return;
+ }
+
+ rs.sethbmsg("",9);
+
+ if( !allUp && time(0) - started < 60 * 5 ) {
+ /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data
+ if we don't have to -- we'd rather be offline and wait a little longer instead
+ todo: make this configurable.
+ */
+ rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes");
+ return;
+ }
+
+ Member& me = *rs._self;
+
+ if( nTies ) {
+ /* tie? we then randomly sleep to try to not collide on our voting. */
+ /* todo: smarter. */
+ if( me.id() == 0 || sleptLast ) {
+ // would be fine for one node not to sleep
+ // todo: biggest / highest priority nodes should be the ones that get to not sleep
+ }
+ else {
+ assert( !rs.lockedByMe() ); // bad to go to sleep locked
+ unsigned ms = ((unsigned) rand()) % 1000 + 50;
+ DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog;
+ sleptLast = true;
+ sleepmillis(ms);
+ throw RetryAfterSleepException();
+ }
+ }
+ sleptLast = false;
+
+ time_t start = time(0);
+ unsigned meid = me.id();
+ int tally = yea( meid );
+ bool success = false;
+ try {
+ log() << "replSet info electSelf " << meid << rsLog;
+
+ BSONObj electCmd = BSON(
+ "replSetElect" << 1 <<
+ "set" << rs.name() <<
+ "who" << me.fullName() <<
+ "whoid" << me.hbinfo().id() <<
+ "cfgver" << rs._cfg->version <<
+ "round" << OID::gen() /* this is just for diagnostics */
+ );
+
+ int configVersion;
+ list<Target> L;
+ rs.getTargets(L, configVersion);
+ multiCommand(electCmd, L);
+
+ {
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ DEV log() << "replSet elect res: " << i->result.toString() << rsLog;
+ if( i->ok ) {
+ int v = i->result["vote"].Int();
+ tally += v;
+ }
+ }
+ if( tally*2 <= totalVotes() ) {
+ log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog;
+ }
+ else if( time(0) - start > 30 ) {
+ // defensive; should never happen as we have timeouts on connection and operation for our conn
+ log() << "replSet too much time passed during our election, ignoring result" << rsLog;
+ }
+ else if( configVersion != rs.config().version ) {
+ log() << "replSet config version changed during our election, ignoring result" << rsLog;
+ }
+ else {
+ /* succeeded. */
+ log(1) << "replSet election succeeded, assuming primary role" << rsLog;
+ success = true;
+ rs.assumePrimary();
+ }
+ }
+ }
+ catch( std::exception& ) {
+ if( !success ) electionFailed(meid);
+ throw;
+ }
+ if( !success ) electionFailed(meid);
+ }
+
+ void Consensus::electSelf() {
+ assert( !rs.lockedByMe() );
+ assert( !rs.myConfig().arbiterOnly );
+ assert( rs.myConfig().slaveDelay == 0 );
+ try {
+ _electSelf();
+ }
+ catch(RetryAfterSleepException&) {
+ throw;
+ }
+ catch(VoteException& ) {
+ log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog;
+ }
+ catch(DBException& e) {
+ log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog;
+ }
+ catch(...) {
+ log() << "replSet warning caught unexpected exception in electSelf()" << rsLog;
+ }
+ }
+
+}
diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp
new file mode 100644
index 00000000000..0b7ed87eac3
--- /dev/null
+++ b/src/mongo/db/repl/health.cpp
@@ -0,0 +1,449 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "health.h"
+#include "../../util/background.h"
+#include "../../client/dbclient.h"
+#include "../../client/connpool.h"
+#include "../commands.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/task.h"
+#include "../../util/mongoutils/html.h"
+#include "../../util/goodies.h"
+#include "../../util/ramlog.h"
+#include "../helpers/dblogger.h"
+#include "connections.h"
+#include "../../util/unittest.h"
+#include "../dbhelpers.h"
+
+namespace mongo {
+ /* decls for connections.h */
+ ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M());
+ mutex ScopedConn::mapMutex("ScopedConn::mapMutex");
+}
+
+namespace mongo {
+
+ using namespace mongoutils::html;
+ using namespace bson;
+
+ static RamLog * _rsLog = new RamLog( "rs" );
+ Tee *rsLog = _rsLog;
+ extern bool replSetBlind; // for testing
+
+ string ago(time_t t) {
+ if( t == 0 ) return "";
+
+ time_t x = time(0) - t;
+ stringstream s;
+ if( x < 180 ) {
+ s << x << " sec";
+ if( x != 1 ) s << 's';
+ }
+ else if( x < 3600 ) {
+ s.precision(2);
+ s << x / 60.0 << " mins";
+ }
+ else {
+ s.precision(2);
+ s << x / 3600.0 << " hrs";
+ }
+ return s.str();
+ }
+
+ void Member::summarizeMember(stringstream& s) const {
+ s << tr();
+ {
+ stringstream u;
+ u << "http://" << h().host() << ':' << (h().port() + 1000) << "/_replSet";
+ s << td( a(u.str(), "", fullName()) );
+ }
+ s << td( id() );
+ double h = hbinfo().health;
+ bool ok = h > 0;
+ s << td(red(str::stream() << h,h == 0));
+ s << td(ago(hbinfo().upSince));
+ bool never = false;
+ {
+ string h;
+ time_t hb = hbinfo().lastHeartbeat;
+ if( hb == 0 ) {
+ h = "never";
+ never = true;
+ }
+ else h = ago(hb) + " ago";
+ s << td(h);
+ }
+ s << td(config().votes);
+ s << td(config().priority);
+ {
+ string stateText = state().toString();
+ if( _config.hidden )
+ stateText += " (hidden)";
+ if( ok || stateText.empty() )
+ s << td(stateText); // text blank if we've never connected
+ else
+ s << td( grey(str::stream() << "(was " << state().toString() << ')', true) );
+ }
+ s << td( grey(hbinfo().lastHeartbeatMsg,!ok) );
+ stringstream q;
+ q << "/_replSetOplog?_id=" << id();
+ s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) );
+ if( hbinfo().skew > INT_MIN ) {
+ s << td( grey(str::stream() << hbinfo().skew,!ok) );
+ }
+ else
+ s << td("");
+ s << _tr();
+ }
+
+ string ReplSetImpl::stateAsHtml(MemberState s) {
+ if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
+ if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
+ if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
+ if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING");
+ if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "FATAL");
+ if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2");
+ if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER");
+ if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN");
+ if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK");
+ return "";
+ }
+
+ extern time_t started;
+
+ // oplogdiags in web ui
+ static void say(stringstream&ss, const bo& op) {
+ ss << "<tr>";
+
+ set<string> skip;
+ be e = op["ts"];
+ if( e.type() == Date || e.type() == Timestamp ) {
+ OpTime ot = e._opTime();
+ ss << td( time_t_to_String_short( ot.getSecs() ) );
+ ss << td( ot.toString() );
+ skip.insert("ts");
+ }
+ else ss << td("?") << td("?");
+
+ e = op["h"];
+ if( e.type() == NumberLong ) {
+ ss << "<td>" << hex << e.Long() << "</td>\n";
+ skip.insert("h");
+ }
+ else
+ ss << td("?");
+
+ ss << td(op["op"].valuestrsafe());
+ ss << td(op["ns"].valuestrsafe());
+ skip.insert("op");
+ skip.insert("ns");
+
+ ss << "<td>";
+ for( bo::iterator i(op); i.more(); ) {
+ be e = i.next();
+ if( skip.count(e.fieldName()) ) continue;
+ ss << e.toString() << ' ';
+ }
+ ss << "</td></tr>\n";
+ }
+
+ void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const {
+ const Member *m = findById(server_id);
+ if( m == 0 ) {
+ ss << "Error : can't find a member with id: " << server_id << '\n';
+ return;
+ }
+
+ ss << p("Server : " + m->fullName() + "<br>ns : " + rsoplog );
+
+ //const bo fields = BSON( "o" << false << "o2" << false );
+ const bo fields;
+
+ /** todo fix we might want an so timeout here */
+ DBClientConnection conn(false, 0, /*timeout*/ 20);
+ {
+ string errmsg;
+ if( !conn.connect(m->fullName(), errmsg) ) {
+ ss << "couldn't connect to " << m->fullName() << ' ' << errmsg;
+ return;
+ }
+ }
+
+ auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",1), 20, 0, &fields);
+ if( c.get() == 0 ) {
+ ss << "couldn't query " << rsoplog;
+ return;
+ }
+ static const char *h[] = {"ts","optime", "h","op","ns","rest",0};
+
+ ss << "<style type=\"text/css\" media=\"screen\">"
+ "table { font-size:75% }\n"
+ // "th { background-color:#bbb; color:#000 }\n"
+ // "td,th { padding:.25em }\n"
+ "</style>\n";
+
+ ss << table(h, true);
+ //ss << "<pre>\n";
+ int n = 0;
+ OpTime otFirst;
+ OpTime otLast;
+ OpTime otEnd;
+ while( c->more() ) {
+ bo o = c->next();
+ otLast = o["ts"]._opTime();
+ if( otFirst.isNull() )
+ otFirst = otLast;
+ say(ss, o);
+ n++;
+ }
+ if( n == 0 ) {
+ ss << rsoplog << " is empty\n";
+ }
+ else {
+ auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields);
+ if( c.get() == 0 ) {
+ ss << "couldn't query [2] " << rsoplog;
+ return;
+ }
+ string x;
+ bo o = c->next();
+ otEnd = o["ts"]._opTime();
+ while( 1 ) {
+ stringstream z;
+ if( o["ts"]._opTime() == otLast )
+ break;
+ say(z, o);
+ x = z.str() + x;
+ if( !c->more() )
+ break;
+ o = c->next();
+ }
+ if( !x.empty() ) {
+ ss << "<tr><td>...</td><td>...</td><td>...</td><td>...</td><td>...</td></tr>\n" << x;
+ //ss << "\n...\n\n" << x;
+ }
+ }
+ ss << _table();
+ ss << p(time_t_to_String_short(time(0)) + " current time");
+
+ if( !otEnd.isNull() ) {
+ ss << "<p>Log length in time: ";
+ unsigned d = otEnd.getSecs() - otFirst.getSecs();
+ double h = d / 3600.0;
+ ss.precision(3);
+ if( h < 72 )
+ ss << h << " hours";
+ else
+ ss << h / 24.0 << " days";
+ ss << "</p>\n";
+ }
+ }
+
+ void ReplSetImpl::_summarizeAsHtml(stringstream& s) const {
+ s << table(0, false);
+ s << tr("Set name:", _name);
+ s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" );
+ s << _table();
+
+ const char *h[] = {"Member",
+ "<a title=\"member id in the replset config\">id</a>",
+ "Up",
+ "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>",
+ "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>",
+ "Votes", "Priority", "State", "Messages",
+ "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>",
+ "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>",
+ 0
+ };
+ s << table(h);
+
+ /* this is to sort the member rows by their ordinal _id, so they show up in the same
+ order on all the different web ui's; that is less confusing for the operator. */
+ map<int,string> mp;
+
+ string myMinValid;
+ try {
+ readlocktry lk("local.replset.minvalid", 300);
+ if( lk.got() ) {
+ BSONObj mv;
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ myMinValid = "minvalid:" + mv["ts"]._opTime().toString();
+ }
+ }
+ else myMinValid = ".";
+ }
+ catch(...) {
+ myMinValid = "exception fetching minvalid";
+ }
+
+ const Member *_self = this->_self;
+ assert(_self);
+ {
+ stringstream s;
+ /* self row */
+ s << tr() << td(_self->fullName() + " (me)") <<
+ td(_self->id()) <<
+ td("1") << //up
+ td(ago(started)) <<
+ td("") << // last heartbeat
+ td(ToString(_self->config().votes)) <<
+ td(ToString(_self->config().priority)) <<
+ td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") );
+ s << td( _hbmsg );
+ stringstream q;
+ q << "/_replSetOplog?_id=" << _self->id();
+ s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) );
+ s << td(""); // skew
+ s << _tr();
+ mp[_self->hbinfo().id()] = s.str();
+ }
+ Member *m = head();
+ while( m ) {
+ stringstream s;
+ m->summarizeMember(s);
+ mp[m->hbinfo().id()] = s.str();
+ m = m->next();
+ }
+
+ for( map<int,string>::const_iterator i = mp.begin(); i != mp.end(); i++ )
+ s << i->second;
+ s << _table();
+ }
+
+
+ void fillRsLog(stringstream& s) {
+ _rsLog->toHTML( s );
+ }
+
+ const Member* ReplSetImpl::findById(unsigned id) const {
+ if( _self && id == _self->id() ) return _self;
+
+ for( Member *m = head(); m; m = m->next() )
+ if( m->id() == id )
+ return m;
+ return 0;
+ }
+
+ const OpTime ReplSetImpl::lastOtherOpTime() const {
+ OpTime closest(0,0);
+
+ for( Member *m = _members.head(); m; m=m->next() ) {
+ if (!m->hbinfo().up()) {
+ continue;
+ }
+
+ if (m->hbinfo().opTime > closest) {
+ closest = m->hbinfo().opTime;
+ }
+ }
+
+ return closest;
+ }
+
+ void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const {
+ vector<BSONObj> v;
+
+ const Member *_self = this->_self;
+ assert( _self );
+
+ MemberState myState = box.getState();
+
+ // add self
+ {
+ BSONObjBuilder bb;
+ bb.append("_id", (int) _self->id());
+ bb.append("name", _self->fullName());
+ bb.append("health", 1.0);
+ bb.append("state", (int)myState.s);
+ bb.append("stateStr", myState.toString());
+ bb.append("uptime", (unsigned)(time(0) - cmdLine.started));
+ if (!_self->config().arbiterOnly) {
+ bb.appendTimestamp("optime", lastOpTimeWritten.asDate());
+ bb.appendDate("optimeDate", lastOpTimeWritten.getSecs() * 1000LL);
+ }
+
+ int maintenance = _maintenanceMode;
+ if (maintenance) {
+ bb.append("maintenanceMode", maintenance);
+ }
+
+ if (theReplSet) {
+ string s = theReplSet->hbmsg();
+ if( !s.empty() )
+ bb.append("errmsg", s);
+ }
+ bb.append("self", true);
+ v.push_back(bb.obj());
+ }
+
+ Member *m =_members.head();
+ while( m ) {
+ BSONObjBuilder bb;
+ bb.append("_id", (int) m->id());
+ bb.append("name", m->fullName());
+ double h = m->hbinfo().health;
+ bb.append("health", h);
+ bb.append("state", (int) m->state().s);
+ if( h == 0 ) {
+ // if we can't connect the state info is from the past and could be confusing to show
+ bb.append("stateStr", "(not reachable/healthy)");
+ }
+ else {
+ bb.append("stateStr", m->state().toString());
+ }
+ bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0));
+ if (!m->config().arbiterOnly) {
+ bb.appendTimestamp("optime", m->hbinfo().opTime.asDate());
+ bb.appendDate("optimeDate", m->hbinfo().opTime.getSecs() * 1000LL);
+ }
+ bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat);
+ bb.append("pingMs", m->hbinfo().ping);
+ string s = m->lhb();
+ if( !s.empty() )
+ bb.append("errmsg", s);
+
+ if (m->hbinfo().authIssue) {
+ bb.append("authenticated", false);
+ }
+
+ v.push_back(bb.obj());
+ m = m->next();
+ }
+ sort(v.begin(), v.end());
+ b.append("set", name());
+ b.appendTimeT("date", time(0));
+ b.append("myState", myState.s);
+ const Member *syncTarget = _currentSyncTarget;
+ if (syncTarget && myState != MemberState::RS_PRIMARY) {
+ b.append("syncingTo", syncTarget->fullName());
+ }
+ b.append("members", v);
+ if( replSetBlind )
+ b.append("blind",true); // to avoid confusion if set...normally never set except for testing.
+ }
+
+ static struct Test : public UnitTest {
+ void run() {
+ HealthOptions a,b;
+ assert( a == b );
+ assert( a.isDefault() );
+ }
+ } test;
+
+}
diff --git a/src/mongo/db/repl/health.h b/src/mongo/db/repl/health.h
new file mode 100644
index 00000000000..55cca93a27e
--- /dev/null
+++ b/src/mongo/db/repl/health.h
@@ -0,0 +1,50 @@
+// replset.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+namespace mongo {
+
+ /* throws */
+ bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false);
+
+ struct HealthOptions {
+ HealthOptions() :
+ heartbeatSleepMillis(2000),
+ heartbeatTimeoutMillis( 10000 ),
+ heartbeatConnRetries(2)
+ { }
+
+ bool isDefault() const { return *this == HealthOptions(); }
+
+ // see http://www.mongodb.org/display/DOCS/Replica+Set+Internals
+ unsigned heartbeatSleepMillis;
+ unsigned heartbeatTimeoutMillis;
+ unsigned heartbeatConnRetries ;
+
+ void check() {
+ uassert(13112, "bad replset heartbeat option", heartbeatSleepMillis >= 10);
+ uassert(13113, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10);
+ }
+
+ bool operator==(const HealthOptions& r) const {
+ return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==r.heartbeatConnRetries;
+ }
+ };
+
+}
diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp
new file mode 100644
index 00000000000..331812af85a
--- /dev/null
+++ b/src/mongo/db/repl/heartbeat.cpp
@@ -0,0 +1,382 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "health.h"
+#include "../../util/background.h"
+#include "../../client/dbclient.h"
+#include "../commands.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/task.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/mongoutils/html.h"
+#include "../../util/goodies.h"
+#include "../../util/ramlog.h"
+#include "../helpers/dblogger.h"
+#include "connections.h"
+#include "../../util/unittest.h"
+#include "../instance.h"
+#include "../repl.h"
+
+namespace mongo {
+
+ using namespace bson;
+
+ extern bool replSetBlind;
+ extern ReplSettings replSettings;
+
+ unsigned int HeartbeatInfo::numPings;
+
+ long long HeartbeatInfo::timeDown() const {
+ if( up() ) return 0;
+ if( downSince == 0 )
+ return 0; // still waiting on first heartbeat
+ return jsTime() - downSince;
+ }
+
+ /* { replSetHeartbeat : <setname> } */
+ class CmdReplSetHeartbeat : public ReplSetCommand {
+ public:
+ CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( replSetBlind ) {
+ if (theReplSet) {
+ errmsg = str::stream() << theReplSet->selfFullName() << " is blind";
+ }
+ return false;
+ }
+
+ /* we don't call ReplSetCommand::check() here because heartbeat
+ checks many things that are pre-initialization. */
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ return false;
+ }
+
+ if (!checkAuth(errmsg, result)) {
+ return false;
+ }
+
+ /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */
+ {
+ AbstractMessagingPort *mp = cc().port();
+ if( mp )
+ mp->tag |= 1;
+ }
+
+ if( cmdObj["pv"].Int() != 1 ) {
+ errmsg = "incompatible replset protocol version";
+ return false;
+ }
+ {
+ string s = string(cmdObj.getStringField("replSetHeartbeat"));
+ if( cmdLine.ourSetName() != s ) {
+ errmsg = "repl set names do not match";
+ log() << "replSet set names do not match, our cmdline: " << cmdLine._replSet << rsLog;
+ log() << "replSet s: " << s << rsLog;
+ result.append("mismatch", true);
+ return false;
+ }
+ }
+
+ result.append("rs", true);
+ if( cmdObj["checkEmpty"].trueValue() ) {
+ result.append("hasData", replHasDatabases());
+ }
+ if( theReplSet == 0 ) {
+ string from( cmdObj.getStringField("from") );
+ if( !from.empty() ) {
+ scoped_lock lck( replSettings.discoveredSeeds_mx );
+ replSettings.discoveredSeeds.insert(from);
+ }
+ result.append("hbmsg", "still initializing");
+ return true;
+ }
+
+ if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
+ errmsg = "repl set names do not match (2)";
+ result.append("mismatch", true);
+ return false;
+ }
+ result.append("set", theReplSet->name());
+ result.append("state", theReplSet->state().s);
+ result.append("e", theReplSet->iAmElectable());
+ result.append("hbmsg", theReplSet->hbmsg());
+ result.append("time", (long long) time(0));
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ int v = theReplSet->config().version;
+ result.append("v", v);
+ if( v > cmdObj["v"].Int() )
+ result << "config" << theReplSet->config().asBson();
+
+ return true;
+ }
+ } cmdReplSetHeartbeat;
+
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result,
+ int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ if( replSetBlind ) {
+ return false;
+ }
+
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
+ "v" << myCfgVersion <<
+ "pv" << 1 <<
+ "checkEmpty" << checkEmpty <<
+ "from" << from );
+
+ // generally not a great idea to do outbound waiting calls in a
+ // write lock. heartbeats can be slow (multisecond to respond), so
+ // generally we don't want to be locked, at least not without
+ // thinking acarefully about it first.
+ uassert(15900, "can't heartbeat: too much lock",
+ !d.dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
+
+ ScopedConn conn(memberFullName);
+ return conn.runCommand("admin", cmd, result, 0);
+ }
+
+ /**
+ * Poll every other set member to check its status.
+ *
+ * A detail about local machines and authentication: suppose we have 2
+ * members, A and B, on the same machine using different keyFiles. A is
+ * primary. If we're just starting the set, there are no admin users, so A
+ * and B can access each other because it's local access.
+ *
+ * Then we add a user to A. B cannot sync this user from A, because as soon
+ * as we add a an admin user, A requires auth. However, A can still
+ * heartbeat B, because B *doesn't* have an admin user. So A can reach B
+ * but B cannot reach A.
+ *
+ * Once B is restarted with the correct keyFile, everything should work as
+ * expected.
+ */
+ class ReplSetHealthPollTask : public task::Task {
+ private:
+ HostAndPort h;
+ HeartbeatInfo m;
+ int tries;
+ const int threshold;
+ public:
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
+ : h(hh), m(mm), tries(0), threshold(15) { }
+
+ string name() const { return "rsHealthPoll"; }
+ void doWork() {
+ if ( !theReplSet ) {
+ LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog;
+ return;
+ }
+
+ HeartbeatInfo mem = m;
+ HeartbeatInfo old = mem;
+ try {
+ BSONObj info;
+ int theirConfigVersion = -10000;
+
+ bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
+
+ // weight new ping with old pings
+ // on the first ping, just use the ping value
+ if (old.ping != 0) {
+ mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
+ }
+
+ if( ok ) {
+ up(info, mem);
+ }
+ else if (!info["errmsg"].eoo() &&
+ info["errmsg"].str() == "need to login") {
+ authIssue(mem);
+ }
+ else {
+ down(mem, info.getStringField("errmsg"));
+ }
+ }
+ catch(DBException& e) {
+ down(mem, e.what());
+ }
+ catch(...) {
+ down(mem, "replSet unexpected exception in ReplSetHealthPollTask");
+ }
+ m = mem;
+
+ theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
+
+ static time_t last = 0;
+ time_t now = time(0);
+ bool changed = mem.changed(old);
+ if( changed ) {
+ if( old.hbstate != mem.hbstate )
+ log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog;
+ }
+ if( changed || now-last>4 ) {
+ last = now;
+ theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+
+ private:
+ bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
+ if (tries++ % threshold == (threshold - 1)) {
+ ScopedConn conn(h.toString());
+ conn.reconnect();
+ }
+
+ Timer timer;
+ time_t before = curTimeMicros64() / 1000000;
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
+ h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ mem.ping = (unsigned int)timer.millis();
+
+ // we set this on any response - we don't get this far if
+ // couldn't connect because exception is thrown
+ time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+
+ if ( info["time"].isNumber() ) {
+ long long t = info["time"].numberLong();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ else {
+ // it won't be there if remote hasn't initialized yet
+ if( info.hasElement("time") )
+ warning() << "heatbeat.time isn't a number: " << info << endl;
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+
+ return ok;
+ }
+
+ void authIssue(HeartbeatInfo& mem) {
+ mem.authIssue = true;
+ mem.hbstate = MemberState::RS_UNKNOWN;
+
+ // set health to 0 so that this doesn't count towards majority
+ mem.health = 0.0;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ void down(HeartbeatInfo& mem, string msg) {
+ mem.authIssue = false;
+ mem.health = 0.0;
+ mem.ping = 0;
+ if( mem.upSince || mem.downSince == 0 ) {
+ mem.upSince = 0;
+ mem.downSince = jsTime();
+ mem.hbstate = MemberState::RS_DOWN;
+ log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
+ }
+ mem.lastHeartbeatMsg = msg;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ void up(const BSONObj& info, HeartbeatInfo& mem) {
+ HeartbeatInfo::numPings++;
+ mem.authIssue = false;
+
+ if( mem.upSince == 0 ) {
+ log() << "replSet member " << h.toString() << " is up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ // see if this member is in the electable set
+ if( info["e"].eoo() ) {
+ // for backwards compatibility
+ const Member *member = theReplSet->findById(mem.id());
+ if (member && member->config().potentiallyHot()) {
+ theReplSet->addToElectable(mem.id());
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+ }
+ // add this server to the electable set if it is within 10
+ // seconds of the latest optime we know of
+ else if( info["e"].trueValue() &&
+ mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
+ unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
+ if (lastOp > 0 && mem.opTime >= lastOp - 10) {
+ theReplSet->addToElectable(mem.id());
+ }
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
+ };
+
+ void ReplSetImpl::endOldHealthTasks() {
+ unsigned sz = healthTasks.size();
+ for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ )
+ (*i)->halt();
+ healthTasks.clear();
+ if( sz )
+ DEV log() << "replSet debug: cleared old tasks " << sz << endl;
+ }
+
+ void ReplSetImpl::startHealthTaskFor(Member *m) {
+ ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
+ healthTasks.insert(task);
+ task::repeat(task, 2000);
+ }
+
+ void startSyncThread();
+
+ /** called during repl set startup. caller expects it to return fairly quickly.
+ note ReplSet object is only created once we get a config - so this won't run
+ until the initiation.
+ */
+ void ReplSetImpl::startThreads() {
+ task::fork(mgr);
+ mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+
+ boost::thread t(startSyncThread);
+
+ task::fork(ghost);
+
+ // member heartbeats are started in ReplSetImpl::initFromConfig
+ }
+
+}
+
+/* todo:
+ stop bg job and delete on removefromset
+*/
diff --git a/src/mongo/db/repl/manager.cpp b/src/mongo/db/repl/manager.cpp
new file mode 100644
index 00000000000..91648a1b506
--- /dev/null
+++ b/src/mongo/db/repl/manager.cpp
@@ -0,0 +1,274 @@
+/* @file manager.cpp
+*/
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "connections.h"
+#include "../client.h"
+
+namespace mongo {
+
+ enum {
+ NOPRIMARY = -2,
+ SELFPRIMARY = -1
+ };
+
+ /* check members OTHER THAN US to see if they think they are primary */
+ const Member * Manager::findOtherPrimary(bool& two) {
+ two = false;
+ Member *m = rs->head();
+ Member *p = 0;
+ while( m ) {
+ DEV assert( m != rs->_self );
+ if( m->state().primary() && m->hbinfo().up() ) {
+ if( p ) {
+ two = true;
+ return 0;
+ }
+ p = m;
+ }
+ m = m->next();
+ }
+ if( p )
+ noteARemoteIsPrimary(p);
+ return p;
+ }
+
+ Manager::Manager(ReplSetImpl *_rs) :
+ task::Server("rsMgr"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) {
+ }
+
+ Manager::~Manager() {
+ /* we don't destroy the replset object we sit in; however, the destructor could have thrown on init.
+ the log message below is just a reminder to come back one day and review this code more, and to
+ make it cleaner.
+ */
+ log() << "info: ~Manager called" << rsLog;
+ rs->mgr = 0;
+ }
+
+ void Manager::starting() {
+ Client::initThread("rsMgr");
+ replLocalAuth();
+ }
+
+ void Manager::noteARemoteIsPrimary(const Member *m) {
+ if( rs->box.getPrimary() == m )
+ return;
+ rs->_self->lhb() = "";
+ if( rs->iAmArbiterOnly() ) {
+ rs->box.set(MemberState::RS_ARBITER, m);
+ }
+ else {
+ rs->box.noteRemoteIsPrimary(m);
+ }
+ }
+
+ void Manager::checkElectableSet() {
+ unsigned otherOp = rs->lastOtherOpTime().getSecs();
+
+ // make sure the electable set is up-to-date
+ if (rs->elect.aMajoritySeemsToBeUp() &&
+ rs->iAmPotentiallyHot() &&
+ (otherOp == 0 || rs->lastOpTimeWritten.getSecs() >= otherOp - 10)) {
+ theReplSet->addToElectable(rs->selfId());
+ }
+ else {
+ theReplSet->rmFromElectable(rs->selfId());
+ }
+
+ // check if we should ask the primary (possibly ourselves) to step down
+ const Member *highestPriority = theReplSet->getMostElectable();
+ const Member *primary = rs->box.getPrimary();
+
+ if (primary && highestPriority &&
+ highestPriority->config().priority > primary->config().priority) {
+ log() << "stepping down " << primary->fullName() << endl;
+
+ if (primary->h().isSelf()) {
+ // replSetStepDown tries to acquire the same lock
+ // msgCheckNewState takes, so we can't call replSetStepDown on
+ // ourselves.
+ rs->relinquish();
+ }
+ else {
+ BSONObj cmd = BSON( "replSetStepDown" << 1 );
+ ScopedConn conn(primary->fullName());
+ BSONObj result;
+ if (!conn.runCommand("admin", cmd, result, 0)) {
+ log() << "stepping down " << primary->fullName()
+ << " failed: " << result << endl;
+ }
+ }
+ }
+ }
+
+ void Manager::checkAuth() {
+ int down = 0, authIssue = 0, total = 0;
+
+ for( Member *m = rs->head(); m; m=m->next() ) {
+ total++;
+
+ // all authIssue servers will also be not up
+ if (!m->hbinfo().up()) {
+ down++;
+ if (m->hbinfo().authIssue) {
+ authIssue++;
+ }
+ }
+ }
+
+ // if all nodes are down or failed auth AND at least one failed
+ // auth, go into recovering. If all nodes are down, stay a
+ // secondary.
+ if (authIssue > 0 && down == total) {
+ log() << "replset error could not reach/authenticate against any members" << endl;
+
+ if (rs->box.getPrimary() == rs->_self) {
+ log() << "auth problems, relinquishing primary" << rsLog;
+ rs->relinquish();
+ }
+
+ rs->blockSync(true);
+ }
+ else {
+ rs->blockSync(false);
+ }
+ }
+
+ /** called as the health threads get new results */
+ void Manager::msgCheckNewState() {
+ {
+ theReplSet->assertValid();
+ rs->assertValid();
+
+ RSBase::lock lk(rs);
+
+ if( busyWithElectSelf ) return;
+
+ checkElectableSet();
+ checkAuth();
+
+ const Member *p = rs->box.getPrimary();
+ if( p && p != rs->_self ) {
+ if( !p->hbinfo().up() ||
+ !p->hbinfo().hbstate.primary() ) {
+ p = 0;
+ rs->box.setOtherPrimary(0);
+ }
+ }
+
+ const Member *p2;
+ {
+ bool two;
+ p2 = findOtherPrimary(two);
+ if( two ) {
+ /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
+ log() << "replSet info two primaries (transiently)" << rsLog;
+ return;
+ }
+ }
+
+ if( p2 ) {
+ /* someone else thinks they are primary. */
+ if( p == p2 ) {
+ // we thought the same; all set.
+ return;
+ }
+ if( p == 0 ) {
+ noteARemoteIsPrimary(p2);
+ return;
+ }
+ // todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+ if( p != rs->_self ) {
+ // switch primary from oldremotep->newremotep2
+ noteARemoteIsPrimary(p2);
+ return;
+ }
+ /* we thought we were primary, yet now someone else thinks they are. */
+ if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ /* we can't see a majority. so the other node is probably the right choice. */
+ noteARemoteIsPrimary(p2);
+ return;
+ }
+ /* ignore for now, keep thinking we are master.
+ this could just be timing (we poll every couple seconds) or could indicate
+ a problem? if it happens consistently for a duration of time we should
+ alert the sysadmin.
+ */
+ return;
+ }
+
+ /* didn't find anyone who wants to be primary */
+
+ if( p ) {
+ /* we are already primary */
+
+ if( p != rs->_self ) {
+ rs->sethbmsg("error p != rs->self in checkNewState");
+ log() << "replSet " << p->fullName() << rsLog;
+ log() << "replSet " << rs->_self->fullName() << rsLog;
+ return;
+ }
+
+ if( rs->elect.shouldRelinquish() ) {
+ log() << "can't see a majority of the set, relinquishing primary" << rsLog;
+ rs->relinquish();
+ }
+
+ return;
+ }
+
+ if( !rs->iAmPotentiallyHot() ) { // if not we never try to be primary
+ OCCASIONALLY log() << "replSet I don't see a primary and I can't elect myself" << endl;
+ return;
+ }
+
+ /* no one seems to be primary. shall we try to elect ourself? */
+ if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ static time_t last;
+ static int n;
+ int ll = 0;
+ if( ++n > 5 ) ll++;
+ if( last + 60 > time(0 ) ) ll++;
+ log(ll) << "replSet can't see a majority, will not try to elect self" << rsLog;
+ last = time(0);
+ return;
+ }
+
+ if( !rs->iAmElectable() ) {
+ return;
+ }
+
+ busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one.
+ }
+ try {
+ rs->elect.electSelf();
+ }
+ catch(RetryAfterSleepException&) {
+ /* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */
+ requeue();
+ }
+ catch(...) {
+ log() << "replSet error unexpected assertion in rs manager" << rsLog;
+ }
+ busyWithElectSelf = false;
+ }
+
+}
diff --git a/src/mongo/db/repl/multicmd.h b/src/mongo/db/repl/multicmd.h
new file mode 100644
index 00000000000..2d70c551f64
--- /dev/null
+++ b/src/mongo/db/repl/multicmd.h
@@ -0,0 +1,75 @@
+// @file multicmd.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../../util/background.h"
+#include "connections.h"
+
+namespace mongo {
+
+ struct Target {
+ Target(string hostport) : toHost(hostport), ok(false) { }
+ //Target() : ok(false) { }
+ const string toHost;
+ bool ok;
+ BSONObj result;
+ };
+
+ /** send a command to several servers in parallel. waits for all to complete before
+ returning.
+
+ in: Target::toHost
+ out: Target::result and Target::ok
+ */
+ void multiCommand(BSONObj cmd, list<Target>& L);
+
+ class _MultiCommandJob : public BackgroundJob {
+ public:
+ BSONObj& cmd;
+ Target& d;
+ _MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { }
+
+ private:
+ string name() const { return "MultiCommandJob"; }
+ void run() {
+ try {
+ ScopedConn c(d.toHost);
+ d.ok = c.runCommand("admin", cmd, d.result);
+ }
+ catch(DBException&) {
+ DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog;
+ }
+ }
+ };
+
+ inline void multiCommand(BSONObj cmd, list<Target>& L) {
+ list< shared_ptr<BackgroundJob> > jobs;
+
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ Target& d = *i;
+ _MultiCommandJob *j = new _MultiCommandJob(cmd, d);
+ jobs.push_back( shared_ptr<BackgroundJob>(j) );
+ j->go();
+ }
+
+ for( list< shared_ptr<BackgroundJob> >::iterator i = jobs.begin(); i != jobs.end(); i++ ) {
+ (*i)->wait();
+ }
+ }
+}
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
new file mode 100644
index 00000000000..84f16e53466
--- /dev/null
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -0,0 +1,404 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../cmdline.h"
+#include "../commands.h"
+#include "../repl.h"
+#include "health.h"
+#include "rs.h"
+#include "rs_config.h"
+#include "../dbwebserver.h"
+#include "../../util/mongoutils/html.h"
+#include "../../client/dbclient.h"
+#include "../repl_block.h"
+
+using namespace bson;
+
+namespace mongo {
+
+ void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial);
+
+ /* commands in other files:
+ replSetHeartbeat - health.cpp
+ replSetInitiate - rs_mod.cpp
+ */
+
+ bool replSetBlind = false;
+ unsigned replSetForceInitialSyncFailure = 0;
+
+ class CmdReplSetTest : public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Just for regression tests.\n";
+ }
+ CmdReplSetTest() : ReplSetCommand("replSetTest") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog;
+
+ if (!checkAuth(errmsg, result)) {
+ return false;
+ }
+
+ if( cmdObj.hasElement("forceInitialSyncFailure") ) {
+ replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number();
+ return true;
+ }
+
+ if( !check(errmsg, result) )
+ return false;
+
+ if( cmdObj.hasElement("blind") ) {
+ replSetBlind = cmdObj.getBoolField("blind");
+ return true;
+ }
+
+ if (cmdObj.hasElement("sethbmsg")) {
+ replset::sethbmsg(cmdObj["sethbmsg"].String());
+ return true;
+ }
+
+ return false;
+ }
+ } cmdReplSetTest;
+
+ /** get rollback id. used to check if a rollback happened during some interval of time.
+ as consumed, the rollback id is not in any particular order, it simply changes on each rollback.
+ @see incRBID()
+ */
+ class CmdReplSetGetRBID : public ReplSetCommand {
+ public:
+ /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */
+ int rbid;
+ virtual void help( stringstream &help ) const {
+ help << "internal";
+ }
+ CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") {
+ // this is ok but micros or combo with some rand() and/or 64 bits might be better --
+ // imagine a restart and a clock correction simultaneously (very unlikely but possible...)
+ rbid = (int) curTimeMillis64();
+ }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ result.append("rbid",rbid);
+ return true;
+ }
+ } cmdReplSetRBID;
+
+ /** we increment the rollback id on every rollback event. */
+ void incRBID() {
+ cmdReplSetRBID.rbid++;
+ }
+
+ /** helper to get rollback id from another server. */
+ int getRBID(DBClientConnection *c) {
+ bo info;
+ c->simpleCommand("admin", &info, "replSetGetRBID");
+ return info["rbid"].numberInt();
+ }
+
+ class CmdReplSetGetStatus : public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Report status of a replica set from the POV of this server\n";
+ help << "{ replSetGetStatus : 1 }";
+ help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+ CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if ( cmdObj["forShell"].trueValue() )
+ lastError.disableForCommand();
+
+ if( !check(errmsg, result) )
+ return false;
+ theReplSet->summarizeStatus(result);
+ return true;
+ }
+ } cmdReplSetGetStatus;
+
+ class CmdReplSetReconfig : public ReplSetCommand {
+ RWLock mutex; /* we don't need rw but we wanted try capability. :-( */
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Adjust configuration of a replica set\n";
+ help << "{ replSetReconfig : config_object }";
+ help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+ CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { }
+ virtual bool run(const string& a, BSONObj& b, int e, string& errmsg, BSONObjBuilder& c, bool d) {
+ try {
+ rwlock_try_write lk(mutex);
+ return _run(a,b,e,errmsg,c,d);
+ }
+ catch(rwlock_try_write::exception&) { }
+ errmsg = "a replSetReconfig is already in progress";
+ return false;
+ }
+ private:
+ bool _run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if ( !checkAuth(errmsg, result) ) {
+ return false;
+ }
+
+ if( cmdObj["replSetReconfig"].type() != Object ) {
+ errmsg = "no configuration specified";
+ return false;
+ }
+
+ bool force = cmdObj.hasField("force") && cmdObj["force"].trueValue();
+ if( force && !theReplSet ) {
+ replSettings.reconfig = cmdObj["replSetReconfig"].Obj().getOwned();
+ result.append("msg", "will try this config momentarily, try running rs.conf() again in a few seconds");
+ return true;
+ }
+
+ if ( !check(errmsg, result) ) {
+ return false;
+ }
+
+ if( !force && !theReplSet->box.getState().primary() ) {
+ errmsg = "replSetReconfig command must be sent to the current replica set primary.";
+ return false;
+ }
+
+ {
+ // just make sure we can get a write lock before doing anything else. we'll reacquire one
+ // later. of course it could be stuck then, but this check lowers the risk if weird things
+ // are up - we probably don't want a change to apply 30 minutes after the initial attempt.
+ time_t t = time(0);
+ writelock lk("");
+ if( time(0)-t > 20 ) {
+ errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
+ return false;
+ }
+ }
+
+ try {
+ ReplSetConfig newConfig(cmdObj["replSetReconfig"].Obj(), force);
+
+ log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog;
+
+ if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) {
+ return false;
+ }
+
+ checkMembersUpForConfigChange(newConfig, result, false);
+
+ log() << "replSet replSetReconfig [2]" << rsLog;
+
+ theReplSet->haveNewConfig(newConfig, true);
+ ReplSet::startupStatusMsg.set("replSetReconfig'd");
+ }
+ catch( DBException& e ) {
+ log() << "replSet replSetReconfig exception: " << e.what() << rsLog;
+ throw;
+ }
+ catch( string& se ) {
+ log() << "replSet reconfig exception: " << se << rsLog;
+ errmsg = se;
+ return false;
+ }
+
+ resetSlaveCache();
+ return true;
+ }
+ } cmdReplSetReconfig;
+
+ class CmdReplSetFreeze : public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "{ replSetFreeze : <seconds> }";
+ help << "'freeze' state of member to the extent we can do that. What this really means is that\n";
+ help << "this node will not attempt to become primary until the time period specified expires.\n";
+ help << "You can call again with {replSetFreeze:0} to unfreeze sooner.\n";
+ help << "A process restart unfreezes the member also.\n";
+ help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+
+ CmdReplSetFreeze() : ReplSetCommand("replSetFreeze") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ int secs = (int) cmdObj.firstElement().numberInt();
+ if( theReplSet->freeze(secs) ) {
+ if( secs == 0 )
+ result.append("info","unfreezing");
+ }
+ if( secs == 1 )
+ result.append("warning", "you really want to freeze for only 1 second?");
+ return true;
+ }
+ } cmdReplSetFreeze;
+
+ class CmdReplSetStepDown: public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "{ replSetStepDown : <seconds> }\n";
+ help << "Step down as primary. Will not try to reelect self for the specified time period (1 minute if no numeric secs value specified).\n";
+ help << "(If another member with same priority takes over in the meantime, it will stay primary.)\n";
+ help << "http://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+
+ CmdReplSetStepDown() : ReplSetCommand("replSetStepDown") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ if( !theReplSet->box.getState().primary() ) {
+ errmsg = "not primary so can't step down";
+ return false;
+ }
+
+ bool force = cmdObj.hasField("force") && cmdObj["force"].trueValue();
+
+ // only step down if there is another node synced to within 10
+ // seconds of this node
+ if (!force) {
+ long long int lastOp = (long long int)theReplSet->lastOpTimeWritten.getSecs();
+ long long int closest = (long long int)theReplSet->lastOtherOpTime().getSecs();
+
+ long long int diff = lastOp - closest;
+ result.append("closest", closest);
+ result.append("difference", diff);
+
+ if (diff < 0) {
+ // not our problem, but we'll wait until thing settle down
+ errmsg = "someone is ahead of the primary?";
+ return false;
+ }
+
+ if (diff > 10) {
+ errmsg = "no secondaries within 10 seconds of my optime";
+ return false;
+ }
+ }
+
+ int secs = (int) cmdObj.firstElement().numberInt();
+ if( secs == 0 )
+ secs = 60;
+ return theReplSet->stepDown(secs);
+ }
+ } cmdReplSetStepDown;
+
+ class CmdReplSetMaintenance: public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "{ replSetMaintenance : bool }\n";
+ help << "Enable or disable maintenance mode.";
+ }
+
+ CmdReplSetMaintenance() : ReplSetCommand("replSetMaintenance") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ if( theReplSet->box.getState().primary() ) {
+ errmsg = "primaries can't modify maintenance mode";
+ return false;
+ }
+
+ theReplSet->setMaintenanceMode(cmdObj["replSetMaintenance"].trueValue());
+ return true;
+ }
+ } cmdReplSetMaintenance;
+
+ using namespace bson;
+ using namespace mongoutils::html;
+ extern void fillRsLog(stringstream&);
+
+ class ReplSetHandler : public DbWebHandler {
+ public:
+ ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ) {}
+
+ virtual bool handles( const string& url ) const {
+ return startsWith( url , "/_replSet" );
+ }
+
+ virtual void handle( const char *rq, string url, BSONObj params,
+ string& responseMsg, int& responseCode,
+ vector<string>& headers, const SockAddr &from ) {
+
+ if( url == "/_replSetOplog" ) {
+ responseMsg = _replSetOplog(params);
+ }
+ else
+ responseMsg = _replSet();
+ responseCode = 200;
+ }
+
+ string _replSetOplog(bo parms) {
+ int _id = (int) str::toUnsigned( parms["_id"].String() );
+
+ stringstream s;
+ string t = "Replication oplog";
+ s << start(t);
+ s << p(t);
+
+ if( theReplSet == 0 ) {
+ if( cmdLine._replSet.empty() )
+ s << p("Not using --replSet");
+ else {
+ s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ + ".<br>" + ReplSet::startupStatusMsg.get());
+ }
+ }
+ else {
+ try {
+ theReplSet->getOplogDiagsAsHtml(_id, s);
+ }
+ catch(std::exception& e) {
+ s << "error querying oplog: " << e.what() << '\n';
+ }
+ }
+
+ s << _end();
+ return s.str();
+ }
+
+ /* /_replSet show replica set status in html format */
+ string _replSet() {
+ stringstream s;
+ s << start("Replica Set Status " + prettyHostName());
+ s << p( a("/", "back", "Home") + " | " +
+ a("/local/system.replset/?html=1", "", "View Replset Config") + " | " +
+ a("/replSetGetStatus?text=1", "", "replSetGetStatus") + " | " +
+ a("http://www.mongodb.org/display/DOCS/Replica+Sets", "", "Docs")
+ );
+
+ if( theReplSet == 0 ) {
+ if( cmdLine._replSet.empty() )
+ s << p("Not using --replSet");
+ else {
+ s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ + ".<br>" + ReplSet::startupStatusMsg.get());
+ }
+ }
+ else {
+ try {
+ theReplSet->summarizeAsHtml(s);
+ }
+ catch(...) { s << "error summarizing replset status\n"; }
+ }
+ s << p("Recent replset log activity:");
+ fillRsLog(s);
+ s << _end();
+ return s.str();
+ }
+
+
+
+ } replSetHandler;
+
+}
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
new file mode 100644
index 00000000000..fff5d72bcc0
--- /dev/null
+++ b/src/mongo/db/repl/rs.cpp
@@ -0,0 +1,778 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../cmdline.h"
+#include "../../util/net/sock.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "../dbhelpers.h"
+#include "../../s/d_logic.h"
+#include "rs.h"
+#include "connections.h"
+#include "../repl.h"
+#include "../instance.h"
+
+using namespace std;
+
+namespace mongo {
+
+ using namespace bson;
+
+ bool replSet = false;
+ ReplSet *theReplSet = 0;
+
+ bool isCurrentlyAReplSetPrimary() {
+ return theReplSet && theReplSet->isPrimary();
+ }
+
+ void replset::sethbmsg(const string& s, const int level) {
+ if (theReplSet) {
+ theReplSet->sethbmsg(s, logLevel);
+ }
+ }
+
+ void ReplSetImpl::sethbmsg(string s, int logLevel) {
+ static time_t lastLogged;
+ _hbmsgTime = time(0);
+
+ if( s == _hbmsg ) {
+ // unchanged
+ if( _hbmsgTime - lastLogged < 60 )
+ return;
+ }
+
+ unsigned sz = s.size();
+ if( sz >= 256 )
+ memcpy(_hbmsg, s.c_str(), 255);
+ else {
+ _hbmsg[sz] = 0;
+ memcpy(_hbmsg, s.c_str(), sz);
+ }
+ if( !s.empty() ) {
+ lastLogged = _hbmsgTime;
+ log(logLevel) << "replSet " << s << rsLog;
+ }
+ }
+
+ void ReplSetImpl::assumePrimary() {
+ LOG(2) << "replSet assuming primary" << endl;
+ assert( iAmPotentiallyHot() );
+ writelock lk("admin."); // so we are synchronized with _logOp()
+
+ // Make sure that new OpTimes are higher than existing ones even with clock skew
+ DBDirectClient c;
+ BSONObj lastOp = c.findOne( "local.oplog.rs", Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk );
+ if ( !lastOp.isEmpty() ) {
+ OpTime::setLast( lastOp[ "ts" ].date() );
+ }
+
+ changeState(MemberState::RS_PRIMARY);
+ }
+
+ void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
+
+ void ReplSetImpl::setMaintenanceMode(const bool inc) {
+ lock lk(this);
+
+ if (inc) {
+ log() << "replSet going into maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog;
+
+ _maintenanceMode++;
+ changeState(MemberState::RS_RECOVERING);
+ }
+ else {
+ _maintenanceMode--;
+ // no need to change state, syncTail will try to go live as a secondary soon
+
+ log() << "leaving maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog;
+ }
+ }
+
+ Member* ReplSetImpl::getMostElectable() {
+ lock lk(this);
+
+ Member *max = 0;
+
+ for (set<unsigned>::iterator it = _electableSet.begin(); it != _electableSet.end(); it++) {
+ const Member *temp = findById(*it);
+ if (!temp) {
+ log() << "couldn't find member: " << *it << endl;
+ _electableSet.erase(*it);
+ continue;
+ }
+ if (!max || max->config().priority < temp->config().priority) {
+ max = (Member*)temp;
+ }
+ }
+
+ return max;
+ }
+
+ const bool closeOnRelinquish = true;
+
+ void ReplSetImpl::relinquish() {
+ LOG(2) << "replSet attempting to relinquish" << endl;
+ if( box.getState().primary() ) {
+ {
+ writelock lk("admin."); // so we are synchronized with _logOp()
+
+ log() << "replSet relinquishing primary state" << rsLog;
+ changeState(MemberState::RS_SECONDARY);
+ }
+
+ if( closeOnRelinquish ) {
+ /* close sockets that were talking to us so they don't blithly send many writes that will fail
+ with "not master" (of course client could check result code, but in case they are not)
+ */
+ log() << "replSet closing client sockets after reqlinquishing primary" << rsLog;
+ MessagingPort::closeAllSockets(1);
+ }
+
+ // now that all connections were closed, strip this mongod from all sharding details
+ // if and when it gets promoted to a primary again, only then it should reload the sharding state
+ // the rationale here is that this mongod won't bring stale state when it regains primaryhood
+ shardingState.resetShardingState();
+
+ }
+ else if( box.getState().startup2() ) {
+ // ? add comment
+ changeState(MemberState::RS_RECOVERING);
+ }
+ }
+
+ /* look freshly for who is primary - includes relinquishing ourself. */
+ void ReplSetImpl::forgetPrimary() {
+ if( box.getState().primary() )
+ relinquish();
+ else {
+ box.setOtherPrimary(0);
+ }
+ }
+
+ // for the replSetStepDown command
+ bool ReplSetImpl::_stepDown(int secs) {
+ lock lk(this);
+ if( box.getState().primary() ) {
+ elect.steppedDown = time(0) + secs;
+ log() << "replSet info stepping down as primary secs=" << secs << rsLog;
+ relinquish();
+ return true;
+ }
+ return false;
+ }
+
+ bool ReplSetImpl::_freeze(int secs) {
+ lock lk(this);
+ /* note if we are primary we remain primary but won't try to elect ourself again until
+ this time period expires.
+ */
+ if( secs == 0 ) {
+ elect.steppedDown = 0;
+ log() << "replSet info 'unfreezing'" << rsLog;
+ }
+ else {
+ if( !box.getState().primary() ) {
+ elect.steppedDown = time(0) + secs;
+ log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog;
+ }
+ else {
+ log() << "replSet info received freeze command but we are primary" << rsLog;
+ }
+ }
+ return true;
+ }
+
+ void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
+ for( Member *m = _members.head(); m; m=m->next() ) {
+ if( m->id() == h.id() ) {
+ m->_hbinfo = h;
+ return;
+ }
+ }
+ }
+
+ list<HostAndPort> ReplSetImpl::memberHostnames() const {
+ list<HostAndPort> L;
+ L.push_back(_self->h());
+ for( Member *m = _members.head(); m; m = m->next() )
+ L.push_back(m->h());
+ return L;
+ }
+
+ void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) {
+ assert( m );
+ if( m->config().hidden )
+ return;
+
+ if( m->potentiallyHot() ) {
+ hosts.push_back(m->h().toString());
+ }
+ else if( !m->config().arbiterOnly ) {
+ if( m->config().slaveDelay ) {
+ /* hmmm - we don't list these as they are stale. */
+ }
+ else {
+ passives.push_back(m->h().toString());
+ }
+ }
+ else {
+ arbiters.push_back(m->h().toString());
+ }
+ }
+
+ void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) {
+ lock lk(this);
+
+ const StateBox::SP sp = box.get();
+ bool isp = sp.state.primary();
+ b.append("setName", name());
+ b.append("ismaster", isp);
+ b.append("secondary", sp.state.secondary());
+ {
+ vector<string> hosts, passives, arbiters;
+ _fillIsMasterHost(_self, hosts, passives, arbiters);
+
+ for( Member *m = _members.head(); m; m = m->next() ) {
+ assert( m );
+ _fillIsMasterHost(m, hosts, passives, arbiters);
+ }
+
+ if( hosts.size() > 0 ) {
+ b.append("hosts", hosts);
+ }
+ if( passives.size() > 0 ) {
+ b.append("passives", passives);
+ }
+ if( arbiters.size() > 0 ) {
+ b.append("arbiters", arbiters);
+ }
+ }
+
+ if( !isp ) {
+ const Member *m = sp.primary;
+ if( m )
+ b.append("primary", m->h().toString());
+ }
+ else {
+ b.append("primary", _self->fullName());
+ }
+
+ if( myConfig().arbiterOnly )
+ b.append("arbiterOnly", true);
+ if( myConfig().priority == 0 && !myConfig().arbiterOnly)
+ b.append("passive", true);
+ if( myConfig().slaveDelay )
+ b.append("slaveDelay", myConfig().slaveDelay);
+ if( myConfig().hidden )
+ b.append("hidden", true);
+ if( !myConfig().buildIndexes )
+ b.append("buildIndexes", false);
+ if( !myConfig().tags.empty() ) {
+ BSONObjBuilder a;
+ for( map<string,string>::const_iterator i = myConfig().tags.begin(); i != myConfig().tags.end(); i++ )
+ a.append((*i).first, (*i).second);
+ b.append("tags", a.done());
+ }
+ b.append("me", myConfig().h.toString());
+ }
+
+ /** @param cfgString <setname>/<seedhost1>,<seedhost2> */
+
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) {
+ const char *p = cfgString.c_str();
+ const char *slash = strchr(p, '/');
+ if( slash )
+ setname = string(p, slash-p);
+ else
+ setname = p;
+ uassert(13093, "bad --replSet config string format is: <setname>[/<seedhost1>,<seedhost2>,...]", !setname.empty());
+
+ if( slash == 0 )
+ return;
+
+ p = slash + 1;
+ while( 1 ) {
+ const char *comma = strchr(p, ',');
+ if( comma == 0 ) comma = strchr(p,0);
+ if( p == comma )
+ break;
+ {
+ HostAndPort m;
+ try {
+ m = HostAndPort( string(p, comma-p) );
+ }
+ catch(...) {
+ uassert(13114, "bad --replSet seed hostname", false);
+ }
+ uassert(13096, "bad --replSet command line config string - dups?", seedSet.count(m) == 0 );
+ seedSet.insert(m);
+ //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost());
+ if( m.isSelf() ) {
+ log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog;
+ }
+ else
+ seeds.push_back(m);
+ if( *comma == 0 )
+ break;
+ p = comma + 1;
+ }
+ }
+ }
+
+ ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
+ _currentSyncTarget(0),
+ _blockSync(false),
+ _hbmsgTime(0),
+ _self(0),
+ _maintenanceMode(0),
+ mgr( new Manager(this) ),
+ ghost( new GhostSync(this) ) {
+
+ _cfg = 0;
+ memset(_hbmsg, 0, sizeof(_hbmsg));
+ strcpy( _hbmsg , "initial startup" );
+ lastH = 0;
+ changeState(MemberState::RS_STARTUP);
+
+ _seeds = &replSetCmdline.seeds;
+
+ LOG(1) << "replSet beginning startup..." << rsLog;
+
+ loadConfig();
+
+ unsigned sss = replSetCmdline.seedSet.size();
+ for( Member *m = head(); m; m = m->next() ) {
+ replSetCmdline.seedSet.erase(m->h());
+ }
+ for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) {
+ if( i->isSelf() ) {
+ if( sss == 1 ) {
+ LOG(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog;
+ }
+ }
+ else {
+ log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog;
+ }
+ }
+ }
+
+ void newReplUp();
+
+ void ReplSetImpl::loadLastOpTimeWritten(bool quiet) {
+ readlock lk(rsoplog);
+ BSONObj o;
+ if( Helpers::getLast(rsoplog, o) ) {
+ lastH = o["h"].numberLong();
+ lastOpTimeWritten = o["ts"]._opTime();
+ uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull());
+ }
+ }
+
+ /* call after constructing to start - returns fairly quickly after launching its threads */
+ void ReplSetImpl::_go() {
+ try {
+ loadLastOpTimeWritten();
+ }
+ catch(std::exception& e) {
+ log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
+ log() << e.what() << rsLog;
+ sleepsecs(30);
+ dbexit( EXIT_REPLICATION_ERROR );
+ return;
+ }
+
+ changeState(MemberState::RS_STARTUP2);
+ startThreads();
+ newReplUp(); // oplog.cpp
+ }
+
+ ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART;
+ DiagStr ReplSetImpl::startupStatusMsg;
+
+ extern BSONObj *getLastErrorDefault;
+
+ void ReplSetImpl::setSelfTo(Member *m) {
+ // already locked in initFromConfig
+ _self = m;
+ _id = m->id();
+ _config = m->config();
+ if( m ) _buildIndexes = m->config().buildIndexes;
+ else _buildIndexes = true;
+ }
+
+ /** @param reconf true if this is a reconfiguration and not an initial load of the configuration.
+ @return true if ok; throws if config really bad; false if config doesn't include self
+ */
+ bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) {
+ /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
+ we cannot error out at this point, except fatally. Check errors earlier.
+ */
+ lock lk(this);
+
+ if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) {
+ // see comment in dbcommands.cpp for getlasterrordefault
+ getLastErrorDefault = new BSONObj( c.getLastErrorDefaults );
+ }
+
+ list<ReplSetConfig::MemberCfg*> newOnes;
+ // additive short-cuts the new config setup. If we are just adding a
+ // node/nodes and nothing else is changing, this is additive. If it's
+ // not a reconfig, we're not adding anything
+ bool additive = reconf;
+ {
+ unsigned nfound = 0;
+ int me = 0;
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
+
+ ReplSetConfig::MemberCfg& m = *i;
+ if( m.h.isSelf() ) {
+ me++;
+ }
+
+ if( reconf ) {
+ if (m.h.isSelf() && (!_self || (int)_self->id() != m._id)) {
+ log() << "self doesn't match: " << m._id << rsLog;
+ assert(false);
+ }
+
+ const Member *old = findById(m._id);
+ if( old ) {
+ nfound++;
+ assert( (int) old->id() == m._id );
+ if( old->config() != m ) {
+ additive = false;
+ }
+ }
+ else {
+ newOnes.push_back(&m);
+ }
+ }
+ }
+ if( me == 0 ) {
+ _members.orphanAll();
+
+ // sending hbs must continue to pick up new config, so we leave
+ // hb threads alone
+
+ // close sockets to force clients to re-evaluate this member
+ MessagingPort::closeAllSockets(0);
+
+ // stop sync thread
+ box.set(MemberState::RS_STARTUP, 0);
+
+ // go into holding pattern
+ log() << "replSet error self not present in the repl set configuration:" << rsLog;
+ log() << c.toString() << rsLog;
+ return false;
+ }
+ uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 );
+
+ // if we found different members that the original config, reload everything
+ if( reconf && config().members.size() != nfound )
+ additive = false;
+ }
+
+ _cfg = new ReplSetConfig(c);
+ assert( _cfg->ok() );
+ assert( _name.empty() || _name == _cfg->_id );
+ _name = _cfg->_id;
+ assert( !_name.empty() );
+
+ // this is a shortcut for simple changes
+ if( additive ) {
+ log() << "replSet info : additive change to configuration" << rsLog;
+ for( list<ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) {
+ ReplSetConfig::MemberCfg *m = *i;
+ Member *mi = new Member(m->h, m->_id, m, false);
+
+ /** we will indicate that new members are up() initially so that we don't relinquish our
+ primary state because we can't (transiently) see a majority. they should be up as we
+ check that new members are up before getting here on reconfig anyway.
+ */
+ mi->get_hbinfo().health = 0.1;
+
+ _members.push(mi);
+ startHealthTaskFor(mi);
+ }
+
+ // if we aren't creating new members, we may have to update the
+ // groups for the current ones
+ _cfg->updateMembers(_members);
+
+ return true;
+ }
+
+ // start with no members. if this is a reconfig, drop the old ones.
+ _members.orphanAll();
+
+ endOldHealthTasks();
+
+ int oldPrimaryId = -1;
+ {
+ const Member *p = box.getPrimary();
+ if( p )
+ oldPrimaryId = p->id();
+ }
+ forgetPrimary();
+
+ // not setting _self to 0 as other threads use _self w/o locking
+ int me = 0;
+
+ // For logging
+ string members = "";
+
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
+ ReplSetConfig::MemberCfg& m = *i;
+ Member *mi;
+ members += ( members == "" ? "" : ", " ) + m.h.toString();
+ if( m.h.isSelf() ) {
+ assert( me++ == 0 );
+ mi = new Member(m.h, m._id, &m, true);
+ if (!reconf) {
+ log() << "replSet I am " << m.h.toString() << rsLog;
+ }
+ setSelfTo(mi);
+
+ if( (int)mi->id() == oldPrimaryId )
+ box.setSelfPrimary(mi);
+ }
+ else {
+ mi = new Member(m.h, m._id, &m, false);
+ _members.push(mi);
+ startHealthTaskFor(mi);
+ if( (int)mi->id() == oldPrimaryId )
+ box.setOtherPrimary(mi);
+ }
+ }
+
+ if( me == 0 ){
+ log() << "replSet warning did not detect own host in full reconfig, members " << members << " config: " << c << rsLog;
+ }
+
+ return true;
+ }
+
+ // Our own config must be the first one.
+ bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
+ int v = -1;
+ ReplSetConfig *highest = 0;
+ int myVersion = -2000;
+ int n = 0;
+ for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
+ ReplSetConfig& cfg = *i;
+ if( ++n == 1 ) myVersion = cfg.version;
+ if( cfg.ok() && cfg.version > v ) {
+ highest = &cfg;
+ v = cfg.version;
+ }
+ }
+ assert( highest );
+
+ if( !initFromConfig(*highest) )
+ return false;
+
+ if( highest->version > myVersion && highest->version >= 0 ) {
+ log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog;
+ highest->saveConfigLocally(BSONObj());
+ }
+ return true;
+ }
+
+ void ReplSetImpl::loadConfig() {
+ while( 1 ) {
+ startupStatus = LOADINGCONFIG;
+ startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)");
+ LOG(1) << "loadConfig() " << rsConfigNs << endl;
+ try {
+ vector<ReplSetConfig> configs;
+ try {
+ configs.push_back( ReplSetConfig(HostAndPort::me()) );
+ }
+ catch(DBException& e) {
+ log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog;
+ }
+ for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {
+ try {
+ configs.push_back( ReplSetConfig(*i) );
+ }
+ catch( DBException& e ) {
+ log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog;
+ }
+ }
+ {
+ scoped_lock lck( replSettings.discoveredSeeds_mx );
+ if( replSettings.discoveredSeeds.size() > 0 ) {
+ for (set<string>::iterator i = replSettings.discoveredSeeds.begin();
+ i != replSettings.discoveredSeeds.end();
+ i++) {
+ try {
+ configs.push_back( ReplSetConfig(HostAndPort(*i)) );
+ }
+ catch( DBException& ) {
+ log(1) << "replSet exception trying to load config from discovered seed " << *i << rsLog;
+ replSettings.discoveredSeeds.erase(*i);
+ }
+ }
+ }
+ }
+
+ if (!replSettings.reconfig.isEmpty()) {
+ try {
+ configs.push_back(ReplSetConfig(replSettings.reconfig, true));
+ }
+ catch( DBException& re) {
+ log() << "replSet couldn't load reconfig: " << re.what() << rsLog;
+ replSettings.reconfig = BSONObj();
+ }
+ }
+
+ int nok = 0;
+ int nempty = 0;
+ for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
+ if( i->ok() )
+ nok++;
+ if( i->empty() )
+ nempty++;
+ }
+ if( nok == 0 ) {
+
+ if( nempty == (int) configs.size() ) {
+ startupStatus = EMPTYCONFIG;
+ startupStatusMsg.set("can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)");
+ log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog;
+ static unsigned once;
+ if( ++once == 1 ) {
+ log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog;
+ }
+ if( _seeds->size() == 0 ) {
+ LOG(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog;
+ }
+ }
+ else {
+ startupStatus = EMPTYUNREACHABLE;
+ startupStatusMsg.set("can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)");
+ log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog;
+ }
+
+ sleepsecs(10);
+ continue;
+ }
+
+ if( !_loadConfigFinish(configs) ) {
+ log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog;
+ sleepsecs(20);
+ continue;
+ }
+ }
+ catch(DBException& e) {
+ startupStatus = BADCONFIG;
+ startupStatusMsg.set("replSet error loading set config (BADCONFIG)");
+ log() << "replSet error loading configurations " << e.toString() << rsLog;
+ log() << "replSet error replication will not start" << rsLog;
+ sethbmsg("error loading set config");
+ _fatal();
+ throw;
+ }
+ break;
+ }
+ startupStatusMsg.set("? started");
+ startupStatus = STARTED;
+ }
+
+ void ReplSetImpl::_fatal() {
+ box.set(MemberState::RS_FATAL, 0);
+ log() << "replSet error fatal, stopping replication" << rsLog;
+ }
+
+ void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
+ bo comment;
+ if( addComment )
+ comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
+
+ newConfig.saveConfigLocally(comment);
+
+ try {
+ if (initFromConfig(newConfig, true)) {
+ log() << "replSet replSetReconfig new config saved locally" << rsLog;
+ }
+ }
+ catch(DBException& e) {
+ if( e.getCode() == 13497 /* removed from set */ ) {
+ cc().shutdown();
+ dbexit( EXIT_CLEAN , "removed from replica set" ); // never returns
+ assert(0);
+ }
+ log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog;
+ _fatal();
+ }
+ catch(...) {
+ log() << "replSet error unexpected exception in haveNewConfig()" << rsLog;
+ _fatal();
+ }
+ }
+
+ void Manager::msgReceivedNewConfig(BSONObj o) {
+ log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog;
+ ReplSetConfig c(o);
+ if( c.version > rs->config().version )
+ theReplSet->haveNewConfig(c, false);
+ else {
+ log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
+ c.version << ' ' << rs->config().version << rsLog;
+ }
+ }
+
+ /* forked as a thread during startup
+ it can run quite a while looking for config. but once found,
+ a separate thread takes over as ReplSetImpl::Manager, and this thread
+ terminates.
+ */
+ void startReplSets(ReplSetCmdline *replSetCmdline) {
+ Client::initThread("rsStart");
+ try {
+ assert( theReplSet == 0 );
+ if( replSetCmdline == 0 ) {
+ assert(!replSet);
+ return;
+ }
+ replLocalAuth();
+ (theReplSet = new ReplSet(*replSetCmdline))->go();
+ }
+ catch(std::exception& e) {
+ log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
+ if( theReplSet )
+ theReplSet->fatal();
+ }
+ cc().shutdown();
+ }
+
+ void replLocalAuth() {
+ if ( noauth )
+ return;
+ cc().getAuthenticationInfo()->authorize("local","_repl");
+ }
+
+
+}
+
+namespace boost {
+
+ void assertion_failed(char const * expr, char const * function, char const * file, long line) {
+ mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl;
+ }
+
+}
diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h
new file mode 100644
index 00000000000..8e43204be3b
--- /dev/null
+++ b/src/mongo/db/repl/rs.h
@@ -0,0 +1,667 @@
+// /db/repl/rs.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../../util/concurrency/list.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/net/hostandport.h"
+#include "../commands.h"
+#include "../oplog.h"
+#include "../oplogreader.h"
+#include "rs_exception.h"
+#include "rs_optime.h"
+#include "rs_member.h"
+#include "rs_config.h"
+
+/**
+ * Order of Events
+ *
+ * On startup, if the --replSet option is present, startReplSets is called.
+ * startReplSets forks off a new thread for replica set activities. It creates
+ * the global theReplSet variable and calls go() on it.
+ *
+ * theReplSet's constructor changes the replica set's state to RS_STARTUP,
+ * starts the replica set manager, and loads the config (if the replica set
+ * has been initialized).
+ */
+
+namespace mongo {
+
+ struct HowToFixUp;
+ struct Target;
+ class DBClientConnection;
+ class ReplSetImpl;
+ class OplogReader;
+ extern bool replSet; // true if using repl sets
+ extern class ReplSet *theReplSet; // null until initialized
+ extern Tee *rsLog;
+
+ /* member of a replica set */
+ class Member : public List1<Member>::Base {
+ private:
+ ~Member(); // intentionally unimplemented as should never be called -- see List1<>::Base.
+ Member(const Member&);
+ public:
+ Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self);
+
+ string fullName() const { return h().toString(); }
+ const ReplSetConfig::MemberCfg& config() const { return _config; }
+ ReplSetConfig::MemberCfg& configw() { return _config; }
+ const HeartbeatInfo& hbinfo() const { return _hbinfo; }
+ HeartbeatInfo& get_hbinfo() { return _hbinfo; }
+ string lhb() const { return _hbinfo.lastHeartbeatMsg; }
+ MemberState state() const { return _hbinfo.hbstate; }
+ const HostAndPort& h() const { return _h; }
+ unsigned id() const { return _hbinfo.id(); }
+
+ bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0
+ void summarizeMember(stringstream& s) const;
+
+ private:
+ friend class ReplSetImpl;
+ ReplSetConfig::MemberCfg _config;
+ const HostAndPort _h;
+ HeartbeatInfo _hbinfo;
+ };
+
+ namespace replset {
+ /**
+ * "Normal" replica set syncing
+ */
+ class SyncTail : public Sync {
+ public:
+ virtual ~SyncTail() {}
+ SyncTail(const string& host) : Sync(host) {}
+ virtual bool syncApply(const BSONObj &o);
+ };
+
+ /**
+ * Initial clone and sync
+ */
+ class InitialSync : public SyncTail {
+ public:
+ InitialSync(const string& host) : SyncTail(host) {}
+ virtual ~InitialSync() {}
+ bool oplogApplication(OplogReader& r, const Member* source, const OpTime& applyGTE, const OpTime& minValid);
+ virtual void applyOp(const BSONObj& o, const OpTime& minvalid);
+ };
+
+ // TODO: move hbmsg into an error-keeping class (SERVER-4444)
+ void sethbmsg(const string& s, const int logLevel=0);
+
+ } // namespace replset
+
+ class Manager : public task::Server {
+ ReplSetImpl *rs;
+ bool busyWithElectSelf;
+ int _primary;
+
+ /** @param two - if true two primaries were seen. this can happen transiently, in addition to our
+ polling being only occasional. in this case null is returned, but the caller should
+ not assume primary itself in that situation.
+ */
+ const Member* findOtherPrimary(bool& two);
+
+ void noteARemoteIsPrimary(const Member *);
+ void checkElectableSet();
+ void checkAuth();
+ virtual void starting();
+ public:
+ Manager(ReplSetImpl *rs);
+ virtual ~Manager();
+ void msgReceivedNewConfig(BSONObj);
+ void msgCheckNewState();
+ };
+
+ class GhostSync : public task::Server {
+ struct GhostSlave : boost::noncopyable {
+ GhostSlave() : last(0), slave(0), init(false) { }
+ OplogReader reader;
+ OpTime last;
+ Member* slave;
+ bool init;
+ };
+ /**
+ * This is a cache of ghost slaves
+ */
+ typedef map< mongo::OID,shared_ptr<GhostSlave> > MAP;
+ MAP _ghostCache;
+ RWLock _lock; // protects _ghostCache
+ ReplSetImpl *rs;
+ virtual void starting();
+ public:
+ GhostSync(ReplSetImpl *_rs) : task::Server("rsGhostSync"), _lock("GhostSync"), rs(_rs) {}
+ ~GhostSync() {
+ log() << "~GhostSync() called" << rsLog;
+ }
+
+ /**
+ * Replica sets can sync in a hierarchical fashion, which throws off w
+ * calculation on the master. percolate() faux-syncs from an upstream
+ * node so that the primary will know what the slaves are up to.
+ *
+ * We can't just directly sync to the primary because it could be
+ * unreachable, e.g., S1--->S2--->S3--->P. S2 should ghost sync from S3
+ * and S3 can ghost sync from the primary.
+ *
+ * Say we have an S1--->S2--->P situation and this node is S2. rid
+ * would refer to S1. S2 would create a ghost slave of S1 and connect
+ * it to P (_currentSyncTarget). Then it would use this connection to
+ * pretend to be S1, replicating off of P.
+ */
+ void percolate(const BSONObj& rid, const OpTime& last);
+ void associateSlave(const BSONObj& rid, const int memberId);
+ void updateSlave(const mongo::OID& id, const OpTime& last);
+ };
+
+ struct Target;
+
+ class Consensus {
+ ReplSetImpl &rs;
+ struct LastYea {
+ LastYea() : when(0), who(0xffffffff) { }
+ time_t when;
+ unsigned who;
+ };
+ static SimpleMutex lyMutex;
+ Guarded<LastYea,lyMutex> ly;
+ unsigned yea(unsigned memberId); // throws VoteException
+ void electionFailed(unsigned meid);
+ void _electSelf();
+ bool weAreFreshest(bool& allUp, int& nTies);
+ bool sleptLast; // slept last elect() pass
+ public:
+ Consensus(ReplSetImpl *t) : rs(*t) {
+ sleptLast = false;
+ steppedDown = 0;
+ }
+
+ /* if we've stepped down, this is when we are allowed to try to elect ourself again.
+ todo: handle possible weirdnesses at clock skews etc.
+ */
+ time_t steppedDown;
+
+ int totalVotes() const;
+ bool aMajoritySeemsToBeUp() const;
+ bool shouldRelinquish() const;
+ void electSelf();
+ void electCmdReceived(BSONObj, BSONObjBuilder*);
+ void multiCommand(BSONObj cmd, list<Target>& L);
+ };
+
+ /**
+ * most operations on a ReplSet object should be done while locked. that
+ * logic implemented here.
+ *
+ * Order of locking: lock the replica set, then take a rwlock.
+ */
+ class RSBase : boost::noncopyable {
+ public:
+ const unsigned magic;
+ void assertValid() { assert( magic == 0x12345677 ); }
+ private:
+ mongo::mutex m;
+ int _locked;
+ ThreadLocalValue<bool> _lockedByMe;
+ protected:
+ RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
+ ~RSBase() {
+ /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */
+ log() << "replSet ~RSBase called" << rsLog;
+ }
+
+ public:
+ class lock {
+ RSBase& rsbase;
+ auto_ptr<scoped_lock> sl;
+ public:
+ lock(RSBase* b) : rsbase(*b) {
+ if( rsbase._lockedByMe.get() )
+ return; // recursive is ok...
+
+ sl.reset( new scoped_lock(rsbase.m) );
+ DEV assert(rsbase._locked == 0);
+ rsbase._locked++;
+ rsbase._lockedByMe.set(true);
+ }
+ ~lock() {
+ if( sl.get() ) {
+ assert( rsbase._lockedByMe.get() );
+ DEV assert(rsbase._locked == 1);
+ rsbase._lockedByMe.set(false);
+ rsbase._locked--;
+ }
+ }
+ };
+
+ /* for asserts */
+ bool locked() const { return _locked != 0; }
+
+ /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another
+ just for asserts & such so we can make the contracts clear on who locks what when.
+ we don't use these locks that frequently, so the little bit of overhead is fine.
+ */
+ bool lockedByMe() { return _lockedByMe.get(); }
+ };
+
+ class ReplSetHealthPollTask;
+
+ /* safe container for our state that keeps member pointer and state variables always aligned */
+ class StateBox : boost::noncopyable {
+ public:
+ struct SP { // SP is like pair<MemberState,const Member *> but nicer
+ SP() : state(MemberState::RS_STARTUP), primary(0) { }
+ MemberState state;
+ const Member *primary;
+ };
+ const SP get() {
+ rwlock lk(m, false);
+ return sp;
+ }
+ MemberState getState() const {
+ rwlock lk(m, false);
+ return sp.state;
+ }
+ const Member* getPrimary() const {
+ rwlock lk(m, false);
+ return sp.primary;
+ }
+ void change(MemberState s, const Member *self) {
+ rwlock lk(m, true);
+ if( sp.state != s ) {
+ log() << "replSet " << s.toString() << rsLog;
+ }
+ sp.state = s;
+ if( s.primary() ) {
+ sp.primary = self;
+ }
+ else {
+ if( self == sp.primary )
+ sp.primary = 0;
+ }
+ }
+ void set(MemberState s, const Member *p) {
+ rwlock lk(m, true);
+ sp.state = s;
+ sp.primary = p;
+ }
+ void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); }
+ void setOtherPrimary(const Member *mem) {
+ rwlock lk(m, true);
+ assert( !sp.state.primary() );
+ sp.primary = mem;
+ }
+ void noteRemoteIsPrimary(const Member *remote) {
+ rwlock lk(m, true);
+ if( !sp.state.secondary() && !sp.state.fatal() )
+ sp.state = MemberState::RS_RECOVERING;
+ sp.primary = remote;
+ }
+ StateBox() : m("StateBox") { }
+ private:
+ RWLock m;
+ SP sp;
+ };
+
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet );
+
+ /** Parameter given to the --replSet command line option (parsed).
+ Syntax is "<setname>/<seedhost1>,<seedhost2>"
+ where setname is a name and seedhost is "<host>[:<port>]" */
+ class ReplSetCmdline {
+ public:
+ ReplSetCmdline(string cfgString) { parseReplsetCmdLine(cfgString, setname, seeds, seedSet); }
+ string setname;
+ vector<HostAndPort> seeds;
+ set<HostAndPort> seedSet;
+ };
+
+ /* information about the entire repl set, such as the various servers in the set, and their state */
+ /* note: We currently do not free mem when the set goes away - it is assumed the replset is a
+ singleton and long lived.
+ */
+ class ReplSetImpl : protected RSBase {
+ public:
+ /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */
+ enum StartupStatus {
+ PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3,
+ EMPTYUNREACHABLE=4, STARTED=5, SOON=6
+ };
+ static StartupStatus startupStatus;
+ static DiagStr startupStatusMsg;
+ static string stateAsHtml(MemberState state);
+
+ /* todo thread */
+ void msgUpdateHBInfo(HeartbeatInfo);
+
+ StateBox box;
+
+ OpTime lastOpTimeWritten;
+ long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
+ private:
+ set<ReplSetHealthPollTask*> healthTasks;
+ void endOldHealthTasks();
+ void startHealthTaskFor(Member *m);
+
+ Consensus elect;
+ void relinquish();
+ void forgetPrimary();
+ protected:
+ bool _stepDown(int secs);
+ bool _freeze(int secs);
+ private:
+ void assumePrimary();
+ void loadLastOpTimeWritten(bool quiet=false);
+ void changeState(MemberState s);
+
+ /**
+ * Find the closest member (using ping time) with a higher latest optime.
+ */
+ Member* getMemberToSyncTo();
+ void veto(const string& host, unsigned secs=10);
+ Member* _currentSyncTarget;
+
+ bool _blockSync;
+ void blockSync(bool block);
+
+ // set of electable members' _ids
+ set<unsigned> _electableSet;
+ protected:
+ // "heartbeat message"
+ // sent in requestHeartbeat respond in field "hbm"
+ char _hbmsg[256]; // we change this unlocked, thus not an stl::string
+ time_t _hbmsgTime; // when it was logged
+ public:
+ void sethbmsg(string s, int logLevel = 0);
+
+ /**
+ * Election with Priorities
+ *
+ * Each node (n) keeps a set of nodes that could be elected primary.
+ * Each node in this set:
+ *
+ * 1. can connect to a majority of the set
+ * 2. has a priority greater than 0
+ * 3. has an optime within 10 seconds of the most up-to-date node
+ * that n can reach
+ *
+ * If a node fails to meet one or more of these criteria, it is removed
+ * from the list. This list is updated whenever the node receives a
+ * heartbeat.
+ *
+ * When a node sends an "am I freshest?" query, the node receiving the
+ * query checks their electable list to make sure that no one else is
+ * electable AND higher priority. If this check passes, the node will
+ * return an "ok" response, if not, it will veto.
+ *
+ * If a node is primary and there is another node with higher priority
+ * on the electable list (i.e., it must be synced to within 10 seconds
+ * of the current primary), the node (or nodes) with connections to both
+ * the primary and the secondary with higher priority will issue
+ * replSetStepDown requests to the primary to allow the higher-priority
+ * node to take over.
+ */
+ void addToElectable(const unsigned m) { lock lk(this); _electableSet.insert(m); }
+ void rmFromElectable(const unsigned m) { lock lk(this); _electableSet.erase(m); }
+ bool iAmElectable() { lock lk(this); return _electableSet.find(_self->id()) != _electableSet.end(); }
+ bool isElectable(const unsigned id) { lock lk(this); return _electableSet.find(id) != _electableSet.end(); }
+ Member* getMostElectable();
+ protected:
+ /**
+ * Load a new config as the replica set's main config.
+ *
+ * If there is a "simple" change (just adding a node), this shortcuts
+ * the config. Returns true if the config was changed. Returns false
+ * if the config doesn't include a this node. Throws an exception if
+ * something goes very wrong.
+ *
+ * Behavior to note:
+ * - locks this
+ * - intentionally leaks the old _cfg and any old _members (if the
+ * change isn't strictly additive)
+ */
+ bool initFromConfig(ReplSetConfig& c, bool reconf=false);
+ void _fillIsMaster(BSONObjBuilder&);
+ void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
+ const ReplSetConfig& config() { return *_cfg; }
+ string name() const { return _name; } /* @return replica set's logical name */
+ MemberState state() const { return box.getState(); }
+ void _fatal();
+ void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const;
+ void _summarizeAsHtml(stringstream&) const;
+ void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command
+
+ /* throws exception if a problem initializing. */
+ ReplSetImpl(ReplSetCmdline&);
+
+ /* call afer constructing to start - returns fairly quickly after launching its threads */
+ void _go();
+
+ private:
+ string _name;
+ const vector<HostAndPort> *_seeds;
+ ReplSetConfig *_cfg;
+
+ /**
+ * Finds the configuration with the highest version number and attempts
+ * load it.
+ */
+ bool _loadConfigFinish(vector<ReplSetConfig>& v);
+ /**
+ * Gather all possible configs (from command line seeds, our own config
+ * doc, and any hosts listed therein) and try to initiate from the most
+ * recent config we find.
+ */
+ void loadConfig();
+
+ list<HostAndPort> memberHostnames() const;
+ const ReplSetConfig::MemberCfg& myConfig() const { return _config; }
+ bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
+ bool iAmPotentiallyHot() const {
+ return myConfig().potentiallyHot() && // not an arbiter
+ elect.steppedDown <= time(0) && // not stepped down/frozen
+ state() == MemberState::RS_SECONDARY; // not stale
+ }
+ protected:
+ Member *_self;
+ bool _buildIndexes; // = _self->config().buildIndexes
+ void setSelfTo(Member *); // use this as it sets buildIndexes var
+ private:
+ List1<Member> _members; // all members of the set EXCEPT _self.
+ ReplSetConfig::MemberCfg _config; // config of _self
+ unsigned _id; // _id of _self
+
+ int _maintenanceMode; // if we should stay in recovering state
+ public:
+ // this is called from within a writelock in logOpRS
+ unsigned selfId() const { return _id; }
+ Manager *mgr;
+ GhostSync *ghost;
+ /**
+ * This forces a secondary to go into recovering state and stay there
+ * until this is called again, passing in "false". Multiple threads can
+ * call this and it will leave maintenance mode once all of the callers
+ * have called it again, passing in false.
+ */
+ void setMaintenanceMode(const bool inc);
+ private:
+ Member* head() const { return _members.head(); }
+ public:
+ const Member* findById(unsigned id) const;
+ private:
+ void _getTargets(list<Target>&, int &configVersion);
+ void getTargets(list<Target>&, int &configVersion);
+ void startThreads();
+ friend class FeedbackThread;
+ friend class CmdReplSetElect;
+ friend class Member;
+ friend class Manager;
+ friend class GhostSync;
+ friend class Consensus;
+
+ private:
+ bool initialSyncOplogApplication(const OpTime& applyGTE, const OpTime& minValid);
+ void _syncDoInitialSync();
+ void syncDoInitialSync();
+ void _syncThread();
+ bool tryToGoLiveAsASecondary(OpTime&); // readlocks
+ void syncTail();
+ unsigned _syncRollback(OplogReader& r);
+ void syncRollback(OplogReader& r);
+ void syncFixUp(HowToFixUp& h, OplogReader& r);
+
+ // get an oplog reader for a server with an oplog entry timestamp greater
+ // than or equal to minTS, if set.
+ Member* _getOplogReader(OplogReader& r, const OpTime& minTS);
+
+ // check lastOpTimeWritten against the remote's earliest op, filling in
+ // remoteOldestOp.
+ bool _isStale(OplogReader& r, const OpTime& minTS, BSONObj& remoteOldestOp);
+
+ // keep a list of hosts that we've tried recently that didn't work
+ map<string,time_t> _veto;
+ public:
+ void syncThread();
+ const OpTime lastOtherOpTime() const;
+ };
+
+ class ReplSet : public ReplSetImpl {
+ public:
+ ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { }
+
+ // for the replSetStepDown command
+ bool stepDown(int secs) { return _stepDown(secs); }
+
+ // for the replSetFreeze command
+ bool freeze(int secs) { return _freeze(secs); }
+
+ string selfFullName() {
+ assert( _self );
+ return _self->fullName();
+ }
+
+ bool buildIndexes() const { return _buildIndexes; }
+
+ /* call after constructing to start - returns fairly quickly after la[unching its threads */
+ void go() { _go(); }
+
+ void fatal() { _fatal(); }
+ bool isPrimary() { return box.getState().primary(); }
+ bool isSecondary() { return box.getState().secondary(); }
+ MemberState state() const { return ReplSetImpl::state(); }
+ string name() const { return ReplSetImpl::name(); }
+ const ReplSetConfig& config() { return ReplSetImpl::config(); }
+ void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { _getOplogDiagsAsHtml(server_id,ss); }
+ void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss); }
+ void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); }
+ void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
+
+ /**
+ * We have a new config (reconfig) - apply it.
+ * @param comment write a no-op comment to the oplog about it. only
+ * makes sense if one is primary and initiating the reconf.
+ *
+ * The slaves are updated when they get a heartbeat indicating the new
+ * config. The comment is a no-op.
+ */
+ void haveNewConfig(ReplSetConfig& c, bool comment);
+
+ /**
+ * Pointer assignment isn't necessarily atomic, so this needs to assure
+ * locking, even though we don't delete old configs.
+ */
+ const ReplSetConfig& getConfig() { return config(); }
+
+ bool lockedByMe() { return RSBase::lockedByMe(); }
+
+ // heartbeat msg to send to others; descriptive diagnostic info
+ string hbmsg() const {
+ if( time(0)-_hbmsgTime > 120 ) return "";
+ return _hbmsg;
+ }
+ };
+
+ /**
+ * Base class for repl set commands. Checks basic things such if we're in
+ * rs mode before the command does its real work.
+ */
+ class ReplSetCommand : public Command {
+ protected:
+ ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
+ virtual bool slaveOk() const { return true; }
+ virtual bool adminOnly() const { return true; }
+ virtual bool logTheOp() { return false; }
+ virtual LockType locktype() const { return NONE; }
+ virtual void help( stringstream &help ) const { help << "internal"; }
+
+ /**
+ * Some replica set commands call this and then call check(). This is
+ * intentional, as they might do things before theReplSet is initialized
+ * that still need to be checked for auth.
+ */
+ bool checkAuth(string& errmsg, BSONObjBuilder& result) {
+ if( !noauth ) {
+ AuthenticationInfo *ai = cc().getAuthenticationInfo();
+ if (!ai->isAuthorizedForLock("admin", locktype())) {
+ errmsg = "replSet command unauthorized";
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool check(string& errmsg, BSONObjBuilder& result) {
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ if( cmdLine.configsvr ) {
+ result.append("info", "configsvr"); // for shell prompt
+ }
+ return false;
+ }
+
+ if( theReplSet == 0 ) {
+ result.append("startupStatus", ReplSet::startupStatus);
+ string s;
+ errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg.get();
+ if( ReplSet::startupStatus == 3 )
+ result.append("info", "run rs.initiate(...) if not yet done for the set");
+ return false;
+ }
+
+ return checkAuth(errmsg, result);
+ }
+ };
+
+ /**
+ * does local authentication
+ * directly authorizes against AuthenticationInfo
+ */
+ void replLocalAuth();
+
+ /** inlines ----------------- */
+
+ inline Member::Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self) :
+ _config(*c), _h(h), _hbinfo(ord) {
+ assert(c);
+ if( self )
+ _hbinfo.health = 1.0;
+ }
+
+}
diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp
new file mode 100644
index 00000000000..22137773aec
--- /dev/null
+++ b/src/mongo/db/repl/rs_config.cpp
@@ -0,0 +1,662 @@
+// rs_config.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "../../client/dbclient.h"
+#include "../../client/syncclusterconnection.h"
+#include "../../util/net/hostandport.h"
+#include "../dbhelpers.h"
+#include "connections.h"
+#include "../oplog.h"
+#include "../instance.h"
+#include "../../util/text.h"
+#include <boost/algorithm/string.hpp>
+
+using namespace bson;
+
+namespace mongo {
+
+ void logOpInitiate(const bo&);
+
+ void assertOnlyHas(BSONObj o, const set<string>& fields) {
+ BSONObj::iterator i(o);
+ while( i.more() ) {
+ BSONElement e = i.next();
+ if( !fields.count( e.fieldName() ) ) {
+ uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "' in object");
+ }
+ }
+ }
+
+ list<HostAndPort> ReplSetConfig::otherMemberHostnames() const {
+ list<HostAndPort> L;
+ for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) {
+ if( !i->h.isSelf() )
+ L.push_back(i->h);
+ }
+ return L;
+ }
+
+ /* comment MUST only be set when initiating the set by the initiator */
+ void ReplSetConfig::saveConfigLocally(bo comment) {
+ checkRsConfig();
+ log() << "replSet info saving a newer config version to local.system.replset" << rsLog;
+ {
+ writelock lk("");
+ Client::Context cx( rsConfigNs );
+ cx.db()->flushFiles(true);
+
+ //theReplSet->lastOpTimeWritten = ??;
+ //rather than above, do a logOp()? probably
+ BSONObj o = asBson();
+ Helpers::putSingletonGod(rsConfigNs.c_str(), o, false/*logOp=false; local db so would work regardless...*/);
+ if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) )
+ logOpInitiate(comment);
+
+ cx.db()->flushFiles(true);
+ }
+ log() << "replSet saveConfigLocally done" << rsLog;
+ }
+
+ bo ReplSetConfig::MemberCfg::asBson() const {
+ bob b;
+ b << "_id" << _id;
+ b.append("host", h.dynString());
+ if( votes != 1 ) b << "votes" << votes;
+ if( priority != 1.0 ) b << "priority" << priority;
+ if( arbiterOnly ) b << "arbiterOnly" << true;
+ if( slaveDelay ) b << "slaveDelay" << slaveDelay;
+ if( hidden ) b << "hidden" << hidden;
+ if( !buildIndexes ) b << "buildIndexes" << buildIndexes;
+ if( !tags.empty() ) {
+ BSONObjBuilder a;
+ for( map<string,string>::const_iterator i = tags.begin(); i != tags.end(); i++ )
+ a.append((*i).first, (*i).second);
+ b.append("tags", a.done());
+ }
+ return b.obj();
+ }
+
+ void ReplSetConfig::updateMembers(List1<Member> &dest) {
+ for (vector<MemberCfg>::iterator source = members.begin(); source < members.end(); source++) {
+ for( Member *d = dest.head(); d; d = d->next() ) {
+ if (d->fullName() == (*source).h.toString()) {
+ d->configw().groupsw() = (*source).groups();
+ }
+ }
+ }
+ }
+
+ bo ReplSetConfig::asBson() const {
+ bob b;
+ b.append("_id", _id).append("version", version);
+
+ BSONArrayBuilder a;
+ for( unsigned i = 0; i < members.size(); i++ )
+ a.append( members[i].asBson() );
+ b.append("members", a.arr());
+
+ if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() || !rules.empty()) {
+ bob settings;
+ if( !rules.empty() ) {
+ bob modes;
+ for (map<string,TagRule*>::const_iterator it = rules.begin(); it != rules.end(); it++) {
+ bob clauses;
+ vector<TagClause*> r = (*it).second->clauses;
+ for (vector<TagClause*>::iterator it2 = r.begin(); it2 < r.end(); it2++) {
+ clauses << (*it2)->name << (*it2)->target;
+ }
+ modes << (*it).first << clauses.obj();
+ }
+ settings << "getLastErrorModes" << modes.obj();
+ }
+ if( !getLastErrorDefaults.isEmpty() )
+ settings << "getLastErrorDefaults" << getLastErrorDefaults;
+ b << "settings" << settings.obj();
+ }
+
+ return b.obj();
+ }
+
+ static inline void mchk(bool expr) {
+ uassert(13126, "bad Member config", expr);
+ }
+
+ void ReplSetConfig::MemberCfg::check() const {
+ mchk(_id >= 0 && _id <= 255);
+ mchk(priority >= 0 && priority <= 1000);
+ mchk(votes <= 100); // votes >= 0 because it is unsigned
+ uassert(13419, "priorities must be between 0.0 and 100.0", priority >= 0.0 && priority <= 100.0);
+ uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0);
+ uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366);
+ uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden);
+ uassert(13477, "priority must be 0 when buildIndexes=false", buildIndexes || priority == 0);
+ }
+/*
+ string ReplSetConfig::TagSubgroup::toString() const {
+ bool first = true;
+ string result = "\""+name+"\": [";
+ for (set<const MemberCfg*>::const_iterator i = m.begin(); i != m.end(); i++) {
+ if (!first) {
+ result += ", ";
+ }
+ first = false;
+ result += (*i)->h.toString();
+ }
+ return result+"]";
+ }
+ */
+ string ReplSetConfig::TagClause::toString() const {
+ string result = name+": {";
+ for (map<string,TagSubgroup*>::const_iterator i = subgroups.begin(); i != subgroups.end(); i++) {
+//TEMP? result += (*i).second->toString()+", ";
+ }
+ result += "TagClause toString TEMPORARILY DISABLED";
+ return result + "}";
+ }
+
+ string ReplSetConfig::TagRule::toString() const {
+ string result = "{";
+ for (vector<TagClause*>::const_iterator it = clauses.begin(); it < clauses.end(); it++) {
+ result += ((TagClause*)(*it))->toString()+",";
+ }
+ return result+"}";
+ }
+
+ void ReplSetConfig::TagSubgroup::updateLast(const OpTime& op) {
+ RACECHECK
+ if (last < op) {
+ last = op;
+
+ for (vector<TagClause*>::iterator it = clauses.begin(); it < clauses.end(); it++) {
+ (*it)->updateLast(op);
+ }
+ }
+ }
+
+ void ReplSetConfig::TagClause::updateLast(const OpTime& op) {
+ RACECHECK
+ if (last >= op) {
+ return;
+ }
+
+ // check at least n subgroups greater than clause.last
+ int count = 0;
+ map<string,TagSubgroup*>::iterator it;
+ for (it = subgroups.begin(); it != subgroups.end(); it++) {
+ if ((*it).second->last >= op) {
+ count++;
+ }
+ }
+
+ if (count >= actualTarget) {
+ last = op;
+ rule->updateLast(op);
+ }
+ }
+
+ void ReplSetConfig::TagRule::updateLast(const OpTime& op) {
+ OpTime *earliest = (OpTime*)&op;
+ vector<TagClause*>::iterator it;
+
+ for (it = clauses.begin(); it < clauses.end(); it++) {
+ if ((*it)->last < *earliest) {
+ earliest = &(*it)->last;
+ }
+ }
+
+ // rules are simply and-ed clauses, so whatever the most-behind
+ // clause is at is what the rule is at
+ last = *earliest;
+ }
+
+ /** @param o old config
+ @param n new config
+ */
+ /*static*/
+ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) {
+ assert( theReplSet );
+
+ if( o._id != n._id ) {
+ errmsg = "set name may not change";
+ return false;
+ }
+ /* TODO : wonder if we need to allow o.version < n.version only, which is more lenient.
+ if someone had some intermediate config this node doesnt have, that could be
+ necessary. but then how did we become primary? so perhaps we are fine as-is.
+ */
+ if( o.version >= n.version ) {
+ errmsg = str::stream() << "version number must increase, old: "
+ << o.version << " new: " << n.version;
+ return false;
+ }
+
+ map<HostAndPort,const ReplSetConfig::MemberCfg*> old;
+ bool isLocalHost = false;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) {
+ if (i->h.isLocalHost()) {
+ isLocalHost = true;
+ }
+ old[i->h] = &(*i);
+ }
+ int me = 0;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) {
+ const ReplSetConfig::MemberCfg& m = *i;
+ if ( (isLocalHost && !m.h.isLocalHost()) || (!isLocalHost && m.h.isLocalHost())) {
+ log() << "reconfig error, cannot switch between localhost and hostnames: "
+ << m.h.toString() << rsLog;
+ uasserted(13645, "hosts cannot switch between localhost and hostname");
+ }
+ if( old.count(m.h) ) {
+ const ReplSetConfig::MemberCfg& oldCfg = *old[m.h];
+ if( oldCfg._id != m._id ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << rsLog;
+ uasserted(13432, "_id may not change for members");
+ }
+ if( oldCfg.buildIndexes != m.buildIndexes ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << rsLog;
+ uasserted(13476, "buildIndexes may not change for members");
+ }
+ /* are transitions to and from arbiterOnly guaranteed safe? if not, we should disallow here.
+ there is a test at replsets/replsetarb3.js */
+ if( oldCfg.arbiterOnly != m.arbiterOnly ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << " arbiterOnly cannot change. remove and readd the member instead " << rsLog;
+ uasserted(13510, "arbiterOnly may not change for members");
+ }
+ }
+ if( m.h.isSelf() )
+ me++;
+ }
+
+ uassert(13433, "can't find self in new replset config", me == 1);
+
+ return true;
+ }
+
+ void ReplSetConfig::clear() {
+ version = -5;
+ _ok = false;
+ }
+
+ void ReplSetConfig::setMajority() {
+ int total = members.size();
+ int nonArbiters = total;
+ int strictMajority = total/2+1;
+
+ for (vector<MemberCfg>::iterator it = members.begin(); it < members.end(); it++) {
+ if ((*it).arbiterOnly) {
+ nonArbiters--;
+ }
+ }
+
+ // majority should be all "normal" members if we have something like 4
+ // arbiters & 3 normal members
+ _majority = (strictMajority > nonArbiters) ? nonArbiters : strictMajority;
+ }
+
+ int ReplSetConfig::getMajority() const {
+ return _majority;
+ }
+
+ void ReplSetConfig::checkRsConfig() const {
+ uassert(13132,
+ str::stream() << "nonmatching repl set name in _id field: " << _id << " vs. " << cmdLine.ourSetName(),
+ _id == cmdLine.ourSetName());
+ uassert(13308, "replSet bad config version #", version > 0);
+ uassert(13133, "replSet bad config no members", members.size() >= 1);
+ uassert(13309, "replSet bad config maximum number of members is 12", members.size() <= 12);
+ {
+ unsigned voters = 0;
+ for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); ++i ) {
+ if( i->votes )
+ voters++;
+ }
+ uassert(13612, "replSet bad config maximum number of voting members is 7", voters <= 7);
+ uassert(13613, "replSet bad config no voting members", voters > 0);
+ }
+ }
+
+ void ReplSetConfig::_populateTagMap(map<string,TagClause> &tagMap) {
+ // create subgroups for each server corresponding to each of
+ // its tags. E.g.:
+ //
+ // A is tagged with {"server" : "A", "dc" : "ny"}
+ // B is tagged with {"server" : "B", "dc" : "ny"}
+ //
+ // At the end of this step, tagMap will contain:
+ //
+ // "server" => {"A" : [A], "B" : [B]}
+ // "dc" => {"ny" : [A,B]}
+
+ for (unsigned i=0; i<members.size(); i++) {
+ MemberCfg member = members[i];
+
+ for (map<string,string>::iterator tag = member.tags.begin(); tag != member.tags.end(); tag++) {
+ string label = (*tag).first;
+ string value = (*tag).second;
+
+ TagClause& clause = tagMap[label];
+ clause.name = label;
+
+ TagSubgroup* subgroup;
+ // search for "ny" in "dc"'s clause
+ if (clause.subgroups.find(value) == clause.subgroups.end()) {
+ clause.subgroups[value] = subgroup = new TagSubgroup(value);
+ }
+ else {
+ subgroup = clause.subgroups[value];
+ }
+
+ subgroup->m.insert(&members[i]);
+ }
+ }
+ }
+
+ void ReplSetConfig::parseRules(const BSONObj& modes) {
+ map<string,TagClause> tagMap;
+ _populateTagMap(tagMap);
+
+ for (BSONObj::iterator i = modes.begin(); i.more(); ) {
+ unsigned int primaryOnly = 0;
+
+ // ruleName : {dc : 2, m : 3}
+ BSONElement rule = i.next();
+ uassert(14046, "getLastErrorMode rules must be objects", rule.type() == mongo::Object);
+
+ TagRule* r = new TagRule();
+
+ BSONObj clauseObj = rule.Obj();
+ for (BSONObj::iterator c = clauseObj.begin(); c.more(); ) {
+ BSONElement clauseElem = c.next();
+ uassert(14829, "getLastErrorMode criteria must be numeric", clauseElem.isNumber());
+
+ // get the clause, e.g., "x.y" : 3
+ const char *criteria = clauseElem.fieldName();
+ int value = clauseElem.numberInt();
+ uassert(14828, str::stream() << "getLastErrorMode criteria must be greater than 0: " << clauseElem, value > 0);
+
+ TagClause* node = new TagClause(tagMap[criteria]);
+
+ int numGroups = node->subgroups.size();
+ uassert(14831, str::stream() << "mode " << clauseObj << " requires "
+ << value << " tagged with " << criteria << ", but only "
+ << numGroups << " with this tag were found", numGroups >= value);
+
+ node->name = criteria;
+ node->target = value;
+ // if any subgroups contain "me", we can decrease the target
+ node->actualTarget = node->target;
+
+ // then we want to add pointers between clause & subgroup
+ for (map<string,TagSubgroup*>::iterator sgs = node->subgroups.begin();
+ sgs != node->subgroups.end(); sgs++) {
+ bool foundMe = false;
+ (*sgs).second->clauses.push_back(node);
+
+ // if this subgroup contains the primary, it's automatically always up-to-date
+ for( set<MemberCfg*>::const_iterator cfg = (*sgs).second->m.begin();
+ cfg != (*sgs).second->m.end();
+ cfg++)
+ {
+ if ((*cfg)->h.isSelf()) {
+ node->actualTarget--;
+ foundMe = true;
+ }
+ }
+
+ for (set<MemberCfg *>::iterator cfg = (*sgs).second->m.begin();
+ !foundMe && cfg != (*sgs).second->m.end(); cfg++) {
+ (*cfg)->groupsw().insert((*sgs).second);
+ }
+ }
+
+ // if all of the members of this clause involve the primary, it's always up-to-date
+ if (node->actualTarget == 0) {
+ node->last = OpTime(INT_MAX, INT_MAX);
+ primaryOnly++;
+ }
+
+ // this is a valid clause, so we want to add it to its rule
+ node->rule = r;
+ r->clauses.push_back(node);
+ }
+
+ // if all of the clauses are satisfied by the primary, this rule is trivially true
+ if (primaryOnly == r->clauses.size()) {
+ r->last = OpTime(INT_MAX, INT_MAX);
+ }
+
+ // if we got here, this is a valid rule
+ LOG(1) << "replSet new rule " << rule.fieldName() << ": " << r->toString() << rsLog;
+ rules[rule.fieldName()] = r;
+ }
+ }
+
+ void ReplSetConfig::from(BSONObj o) {
+ static const string legal[] = {"_id","version", "members","settings"};
+ static const set<string> legals(legal, legal + 4);
+ assertOnlyHas(o, legals);
+
+ md5 = o.md5();
+ _id = o["_id"].String();
+ if( o["version"].ok() ) {
+ version = o["version"].numberInt();
+ uassert(13115, "bad " + rsConfigNs + " config: version", version > 0);
+ }
+
+ set<string> hosts;
+ set<int> ords;
+ vector<BSONElement> members;
+ try {
+ members = o["members"].Array();
+ }
+ catch(...) {
+ uasserted(13131, "replSet error parsing (or missing) 'members' field in config object");
+ }
+
+ unsigned localhosts = 0;
+ for( unsigned i = 0; i < members.size(); i++ ) {
+ BSONObj mobj = members[i].Obj();
+ MemberCfg m;
+ try {
+ static const string legal[] = {
+ "_id","votes","priority","host", "hidden","slaveDelay",
+ "arbiterOnly","buildIndexes","tags","initialSync" // deprecated
+ };
+ static const set<string> legals(legal, legal + 10);
+ assertOnlyHas(mobj, legals);
+
+ try {
+ m._id = (int) mobj["_id"].Number();
+ }
+ catch(...) {
+ /* TODO: use of string exceptions may be problematic for reconfig case! */
+ throw "_id must be numeric";
+ }
+ try {
+ string s = mobj["host"].String();
+ boost::trim(s);
+ m.h = HostAndPort(s);
+ if ( !m.h.hasPort() ) {
+ // make port explicit even if default
+ m.h.setPort(m.h.port());
+ }
+ }
+ catch(...) {
+ throw string("bad or missing host field? ") + mobj.toString();
+ }
+ if( m.h.isLocalHost() )
+ localhosts++;
+ m.arbiterOnly = mobj["arbiterOnly"].trueValue();
+ m.slaveDelay = mobj["slaveDelay"].numberInt();
+ if( mobj.hasElement("hidden") )
+ m.hidden = mobj["hidden"].trueValue();
+ if( mobj.hasElement("buildIndexes") )
+ m.buildIndexes = mobj["buildIndexes"].trueValue();
+ if( mobj.hasElement("priority") )
+ m.priority = mobj["priority"].Number();
+ if( mobj.hasElement("votes") )
+ m.votes = (unsigned) mobj["votes"].Number();
+ if( mobj.hasElement("tags") ) {
+ const BSONObj &t = mobj["tags"].Obj();
+ for (BSONObj::iterator c = t.begin(); c.more(); c.next()) {
+ m.tags[(*c).fieldName()] = (*c).String();
+ }
+ uassert(14827, "arbiters cannot have tags", !m.arbiterOnly || m.tags.empty() );
+ }
+ m.check();
+ }
+ catch( const char * p ) {
+ log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog;
+ stringstream ss;
+ ss << "replSet members[" << i << "] " << p;
+ uassert(13107, ss.str(), false);
+ }
+ catch(DBException& e) {
+ log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog;
+ stringstream ss;
+ ss << "bad config for member[" << i << "] " << e.what();
+ uassert(13135, ss.str(), false);
+ }
+ if( !(ords.count(m._id) == 0 && hosts.count(m.h.toString()) == 0) ) {
+ log() << "replSet " << o.toString() << rsLog;
+ uassert(13108, "bad replset config -- duplicate hosts in the config object?", false);
+ }
+ hosts.insert(m.h.dynString());
+ ords.insert(m._id);
+ this->members.push_back(m);
+ }
+ uassert(13393, "can't use localhost in repl set member names except when using it for all members", localhosts == 0 || localhosts == members.size());
+ uassert(13117, "bad " + rsConfigNs + " config", !_id.empty());
+
+ if( o["settings"].ok() ) {
+ BSONObj settings = o["settings"].Obj();
+ if( settings["getLastErrorModes"].ok() ) {
+ parseRules(settings["getLastErrorModes"].Obj());
+ }
+ ho.check();
+ try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); }
+ catch(...) { }
+ }
+
+ // figure out the majority for this config
+ setMajority();
+ }
+
+ static inline void configAssert(bool expr) {
+ uassert(13122, "bad repl set config?", expr);
+ }
+
+ ReplSetConfig::ReplSetConfig(BSONObj cfg, bool force) {
+ _constructed = false;
+ clear();
+ from(cfg);
+ if( force ) {
+ version += rand() % 100000 + 10000;
+ }
+ configAssert( version < 0 /*unspecified*/ || (version >= 1) );
+ if( version < 1 )
+ version = 1;
+ _ok = true;
+ _constructed = true;
+ }
+
+ ReplSetConfig::ReplSetConfig(const HostAndPort& h) {
+ LOG(2) << "ReplSetConfig load " << h.toStringLong() << rsLog;
+
+ _constructed = false;
+ clear();
+ int level = 2;
+ DEV level = 0;
+
+ BSONObj cfg;
+ int v = -5;
+ try {
+ if( h.isSelf() ) {
+ ;
+ }
+ else {
+ /* first, make sure other node is configured to be a replset. just to be safe. */
+ string setname = cmdLine.ourSetName();
+ BSONObj cmd = BSON( "replSetHeartbeat" << setname );
+ int theirVersion;
+ BSONObj info;
+ log() << "trying to contact " << h.toString() << rsLog;
+ bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion);
+ if( info["rs"].trueValue() ) {
+ // yes, it is a replicate set, although perhaps not yet initialized
+ }
+ else {
+ if( !ok ) {
+ log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog;
+ if( !info.isEmpty() )
+ log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog;
+ return;
+ }
+ {
+ stringstream ss;
+ ss << "replSet error: member " << h.toString() << " is not in --replSet mode";
+ msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught
+ //for python err# checker: uassert(13260, "", false);
+ }
+ }
+ }
+
+ v = -4;
+ unsigned long long count = 0;
+ try {
+ ScopedConn conn(h.toString());
+ v = -3;
+ cfg = conn.findOne(rsConfigNs, Query()).getOwned();
+ count = conn.count(rsConfigNs);
+ }
+ catch ( DBException& ) {
+ if ( !h.isSelf() ) {
+ throw;
+ }
+
+ // on startup, socket is not listening yet
+ DBDirectClient cli;
+ cfg = cli.findOne( rsConfigNs, Query() ).getOwned();
+ count = cli.count(rsConfigNs);
+ }
+
+ if( count > 1 )
+ uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString());
+
+ if( cfg.isEmpty() ) {
+ version = EMPTYCONFIG;
+ return;
+ }
+ version = -1;
+ }
+ catch( DBException& e) {
+ version = v;
+ log(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog;
+ return;
+ }
+
+ from(cfg);
+ checkRsConfig();
+ _ok = true;
+ log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog;
+ _constructed = true;
+ }
+
+}
diff --git a/src/mongo/db/repl/rs_config.h b/src/mongo/db/repl/rs_config.h
new file mode 100644
index 00000000000..cfe2e86a568
--- /dev/null
+++ b/src/mongo/db/repl/rs_config.h
@@ -0,0 +1,251 @@
+// rs_config.h
+// repl set configuration
+//
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../../util/net/hostandport.h"
+#include "../../util/concurrency/race.h"
+#include "health.h"
+
+namespace mongo {
+ class Member;
+ const string rsConfigNs = "local.system.replset";
+
+ class ReplSetConfig {
+ enum { EMPTYCONFIG = -2 };
+ struct TagSubgroup;
+ public:
+ /**
+ * This contacts the given host and tries to get a config from them.
+ *
+ * This sends a test heartbeat to the host and, if all goes well and the
+ * host has a more recent config, fetches the config and loads it (see
+ * from().
+ *
+ * If it's contacting itself, it skips the heartbeat (for obvious
+ * reasons.) If something is misconfigured, throws an exception. If the
+ * host couldn't be queried or is just blank, ok() will be false.
+ */
+ ReplSetConfig(const HostAndPort& h);
+
+ ReplSetConfig(BSONObj cfg, bool force=false);
+
+ bool ok() const { return _ok; }
+
+ struct TagRule;
+
+ struct MemberCfg {
+ MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false), buildIndexes(true) { }
+ int _id; /* ordinal */
+ unsigned votes; /* how many votes this node gets. default 1. */
+ HostAndPort h;
+ double priority; /* 0 means can never be primary */
+ bool arbiterOnly;
+ int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */
+ bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */
+ bool buildIndexes; /* if false, do not create any non-_id indexes */
+ map<string,string> tags; /* tagging for data center, rack, etc. */
+ private:
+ set<TagSubgroup*> _groups; // the subgroups this member belongs to
+ public:
+ const set<TagSubgroup*>& groups() const {
+ return _groups;
+ }
+ set<TagSubgroup*>& groupsw() {
+ return _groups;
+ }
+ void check() const; /* check validity, assert if not. */
+ BSONObj asBson() const;
+ bool potentiallyHot() const { return !arbiterOnly && priority > 0; }
+ void updateGroups(const OpTime& last) {
+ RACECHECK
+ for (set<TagSubgroup*>::const_iterator it = groups().begin(); it != groups().end(); it++) {
+ ((TagSubgroup*)(*it))->updateLast(last);
+ }
+ }
+ bool operator==(const MemberCfg& r) const {
+ if (!tags.empty() || !r.tags.empty()) {
+ if (tags.size() != r.tags.size()) {
+ return false;
+ }
+
+ // if they are the same size and not equal, at least one
+ // element in A must be different in B
+ for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) {
+ map<string,string>::const_iterator rit = r.tags.find((*lit).first);
+
+ if (rit == r.tags.end() || (*lit).second != (*rit).second) {
+ return false;
+ }
+ }
+ }
+
+ return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
+ arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden &&
+ buildIndexes == buildIndexes;
+ }
+ bool operator!=(const MemberCfg& r) const { return !(*this == r); }
+ };
+
+ vector<MemberCfg> members;
+ string _id;
+ int version;
+ HealthOptions ho;
+ string md5;
+ BSONObj getLastErrorDefaults;
+ map<string,TagRule*> rules;
+
+ list<HostAndPort> otherMemberHostnames() const; // except self
+
+ /** @return true if could connect, and there is no cfg object there at all */
+ bool empty() const { return version == EMPTYCONFIG; }
+
+ string toString() const { return asBson().toString(); }
+
+ /** validate the settings. does not call check() on each member, you have to do that separately. */
+ void checkRsConfig() const;
+
+ /** check if modification makes sense */
+ static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg);
+
+ //static void receivedNewConfig(BSONObj);
+ void saveConfigLocally(BSONObj comment); // to local db
+ string saveConfigEverywhere(); // returns textual info on what happened
+
+ /**
+ * Update members' groups when the config changes but members stay the same.
+ */
+ void updateMembers(List1<Member> &dest);
+
+ BSONObj asBson() const;
+
+ /**
+ * Getter and setter for _majority. This is almost always
+ * members.size()/2+1, but can be the number of non-arbiter members if
+ * there are more arbiters than non-arbiters (writing to 3 out of 7
+ * servers is safe if 4 of the servers are arbiters).
+ */
+ void setMajority();
+ int getMajority() const;
+
+ bool _constructed;
+ private:
+ bool _ok;
+ int _majority;
+
+ void from(BSONObj);
+ void clear();
+
+ struct TagClause;
+
+ /**
+ * This is a logical grouping of servers. It is pointed to by a set of
+ * servers with a certain tag.
+ *
+ * For example, suppose servers A, B, and C have the tag "dc" : "nyc". If we
+ * have a rule {"dc" : 2}, then we want A _or_ B _or_ C to have the
+ * write for one of the "dc" critiria to be fulfilled, so all three will
+ * point to this subgroup. When one of their oplog-tailing cursors is
+ * updated, this subgroup is updated.
+ */
+ struct TagSubgroup : boost::noncopyable {
+ ~TagSubgroup(); // never called; not defined
+ TagSubgroup(string nm) : name(nm) { }
+ const string name;
+ OpTime last;
+ vector<TagClause*> clauses;
+
+ // this probably won't actually point to valid members after the
+ // subgroup is created, as initFromConfig() makes a copy of the
+ // config
+ set<MemberCfg*> m;
+
+ void updateLast(const OpTime& op);
+
+ //string toString() const;
+
+ /**
+ * If two tags have the same name, they should compare as equal so
+ * that members don't have to update two identical groups on writes.
+ */
+ bool operator() (TagSubgroup& lhs, TagSubgroup& rhs) const {
+ return lhs.name < rhs.name;
+ }
+ };
+
+ /**
+ * An argument in a rule. For example, if we had the rule {dc : 2,
+ * machines : 3}, "dc" : 2 and "machines" : 3 would be two TagClauses.
+ *
+ * Each tag clause has a set of associated subgroups. For example, if
+ * we had "dc" : 2, our subgroups might be "nyc", "sf", and "hk".
+ */
+ struct TagClause {
+ OpTime last;
+ map<string,TagSubgroup*> subgroups;
+ TagRule *rule;
+ string name;
+ /**
+ * If we have get a clause like {machines : 3} and this server is
+ * tagged with "machines", then it's really {machines : 2}, as we
+ * will always be up-to-date. So, target would be 3 and
+ * actualTarget would be 2, in that example.
+ */
+ int target;
+ int actualTarget;
+
+ void updateLast(const OpTime& op);
+ string toString() const;
+ };
+
+ /**
+ * Parses getLastErrorModes.
+ */
+ void parseRules(const BSONObj& modes);
+
+ /**
+ * Create a hash containing every possible clause that could be used in a
+ * rule and the servers related to that clause.
+ *
+ * For example, suppose we have the following servers:
+ * A {"dc" : "ny", "ny" : "rk1"}
+ * B {"dc" : "ny", "ny" : "rk1"}
+ * C {"dc" : "ny", "ny" : "rk2"}
+ * D {"dc" : "sf", "sf" : "rk1"}
+ * E {"dc" : "sf", "sf" : "rk2"}
+ *
+ * This would give us the possible criteria:
+ * "dc" -> {A, B, C},{D, E}
+ * "ny" -> {A, B},{C}
+ * "sf" -> {D},{E}
+ */
+ void _populateTagMap(map<string,TagClause> &tagMap);
+
+ public:
+ struct TagRule {
+ vector<TagClause*> clauses;
+ OpTime last;
+
+ void updateLast(const OpTime& op);
+ string toString() const;
+ };
+ };
+
+}
diff --git a/src/mongo/db/repl/rs_exception.h b/src/mongo/db/repl/rs_exception.h
new file mode 100644
index 00000000000..fc372fc241c
--- /dev/null
+++ b/src/mongo/db/repl/rs_exception.h
@@ -0,0 +1,17 @@
+// @file rs_exception.h
+
+#pragma once
+
+namespace mongo {
+
+ class VoteException : public std::exception {
+ public:
+ const char * what() const throw () { return "VoteException"; }
+ };
+
+ class RetryAfterSleepException : public std::exception {
+ public:
+ const char * what() const throw () { return "RetryAfterSleepException"; }
+ };
+
+}
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
new file mode 100644
index 00000000000..b67c0d71b83
--- /dev/null
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -0,0 +1,271 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../repl.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "rs.h"
+#include "../oplogreader.h"
+#include "../../util/mongoutils/str.h"
+#include "../dbhelpers.h"
+#include "rs_optime.h"
+#include "../oplog.h"
+
+namespace mongo {
+
+ using namespace mongoutils;
+ using namespace bson;
+
+ void dropAllDatabasesExceptLocal();
+
+ // add try/catch with sleep
+
+ void isyncassert(const string& msg, bool expr) {
+ if( !expr ) {
+ string m = str::stream() << "initial sync " << msg;
+ theReplSet->sethbmsg(m, 0);
+ uasserted(13404, m);
+ }
+ }
+
+ void ReplSetImpl::syncDoInitialSync() {
+ createOplog();
+
+ while( 1 ) {
+ try {
+ _syncDoInitialSync();
+ break;
+ }
+ catch(DBException& e) {
+ sethbmsg("initial sync exception " + e.toString(), 0);
+ sleepsecs(30);
+ }
+ }
+ }
+
+ /* todo : progress metering to sethbmsg. */
+ static bool clone(const char *master, string db) {
+ string err;
+ return cloneFrom(master, err, db, false,
+ /* slave_ok */ true, true, false, /*mayYield*/true, /*mayBeInterrupted*/false);
+ }
+
+ void _logOpObjRS(const BSONObj& op);
+
+ static void emptyOplog() {
+ writelock lk(rsoplog);
+ Client::Context ctx(rsoplog);
+ NamespaceDetails *d = nsdetails(rsoplog);
+
+ // temp
+ if( d && d->stats.nrecords == 0 )
+ return; // already empty, ok.
+
+ LOG(1) << "replSet empty oplog" << rsLog;
+ d->emptyCappedCollection(rsoplog);
+ }
+
+ Member* ReplSetImpl::getMemberToSyncTo() {
+ Member *closest = 0;
+ time_t now = 0;
+ bool buildIndexes = true;
+
+ // wait for 2N pings before choosing a sync target
+ if (_cfg) {
+ int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings;
+
+ if (needMorePings > 0) {
+ OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
+ return NULL;
+ }
+
+ buildIndexes = myConfig().buildIndexes;
+ }
+
+ // find the member with the lowest ping time that has more data than me
+ for (Member *m = _members.head(); m; m = m->next()) {
+ if (m->hbinfo().up() &&
+ // make sure members with buildIndexes sync from other members w/indexes
+ (!buildIndexes || (buildIndexes && m->config().buildIndexes)) &&
+ (m->state() == MemberState::RS_PRIMARY ||
+ (m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) &&
+ (!closest || m->hbinfo().ping < closest->hbinfo().ping)) {
+
+ map<string,time_t>::iterator vetoed = _veto.find(m->fullName());
+ if (vetoed == _veto.end()) {
+ closest = m;
+ break;
+ }
+
+ if (now == 0) {
+ now = time(0);
+ }
+
+ // if this was on the veto list, check if it was vetoed in the last "while"
+ if ((*vetoed).second < now) {
+ _veto.erase(vetoed);
+ closest = m;
+ break;
+ }
+
+ // if it was recently vetoed, skip
+ log() << "replSet not trying to sync from " << (*vetoed).first
+ << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog;
+ }
+ }
+
+ {
+ lock lk(this);
+
+ if (!closest) {
+ _currentSyncTarget = NULL;
+ return NULL;
+ }
+
+ _currentSyncTarget = closest;
+ }
+
+ sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0);
+
+ return closest;
+ }
+
+ void ReplSetImpl::veto(const string& host, const unsigned secs) {
+ _veto[host] = time(0)+secs;
+ }
+
+ /**
+ * Do the initial sync for this member.
+ */
+ void ReplSetImpl::_syncDoInitialSync() {
+ sethbmsg("initial sync pending",0);
+
+ // if this is the first node, it may have already become primary
+ if ( box.getState().primary() ) {
+ sethbmsg("I'm already primary, no need for initial sync",0);
+ return;
+ }
+
+ const Member *source = getMemberToSyncTo();
+ if (!source) {
+ sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0);
+ sleepsecs(15);
+ return;
+ }
+
+ string sourceHostname = source->h().toString();
+ OplogReader r;
+ if( !r.connect(sourceHostname) ) {
+ sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0);
+ sleepsecs(15);
+ return;
+ }
+
+ BSONObj lastOp = r.getLastOp(rsoplog);
+ if( lastOp.isEmpty() ) {
+ sethbmsg("initial sync couldn't read remote oplog", 0);
+ sleepsecs(15);
+ return;
+ }
+ OpTime startingTS = lastOp["ts"]._opTime();
+
+ if (replSettings.fastsync) {
+ log() << "fastsync: skipping database clone" << rsLog;
+ }
+ else {
+ sethbmsg("initial sync drop all databases", 0);
+ dropAllDatabasesExceptLocal();
+
+ sethbmsg("initial sync clone all databases", 0);
+
+ list<string> dbs = r.conn()->getDatabaseNames();
+ for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ string db = *i;
+ if( db != "local" ) {
+ sethbmsg( str::stream() << "initial sync cloning db: " << db , 0);
+ bool ok;
+ {
+ writelock lk(db);
+ Client::Context ctx(db);
+ ok = clone(sourceHostname.c_str(), db);
+ }
+ if( !ok ) {
+ sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0);
+ veto(source->fullName(), 600);
+ sleepsecs(300);
+ return;
+ }
+ }
+ }
+ }
+
+ sethbmsg("initial sync query minValid",0);
+
+ /* our cloned copy will be strange until we apply oplog events that occurred
+ through the process. we note that time point here. */
+ BSONObj minValid = r.getLastOp(rsoplog);
+ isyncassert( "getLastOp is empty ", !minValid.isEmpty() );
+ OpTime mvoptime = minValid["ts"]._opTime();
+ assert( !mvoptime.isNull() );
+ assert( mvoptime >= startingTS );
+
+ // apply startingTS..mvoptime portion of the oplog
+ {
+ // note we assume here that this call does not throw
+ if( ! initialSyncOplogApplication(startingTS, mvoptime) ) {
+ log() << "replSet initial sync failed during oplog application phase" << rsLog;
+
+ emptyOplog(); // otherwise we'll be up!
+
+ lastOpTimeWritten = OpTime();
+ lastH = 0;
+
+ log() << "replSet cleaning up [1]" << rsLog;
+ {
+ writelock lk("local.");
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ }
+ log() << "replSet cleaning up [2]" << rsLog;
+
+ log() << "replSet initial sync failed will try again" << endl;
+
+ sleepsecs(5);
+ return;
+ }
+ }
+
+ sethbmsg("initial sync finishing up",0);
+
+ assert( !box.getState().primary() ); // wouldn't make sense if we were.
+
+ {
+ writelock lk("local.");
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ try {
+ log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog;
+ }
+ catch(...) { }
+ Helpers::putSingleton("local.replset.minvalid", minValid);
+ cx.db()->flushFiles(true);
+ }
+
+ sethbmsg("initial sync done",0);
+ }
+
+}
diff --git a/src/mongo/db/repl/rs_initiate.cpp b/src/mongo/db/repl/rs_initiate.cpp
new file mode 100644
index 00000000000..77bc6c03938
--- /dev/null
+++ b/src/mongo/db/repl/rs_initiate.cpp
@@ -0,0 +1,269 @@
+/* @file rs_initiate.cpp
+ */
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../cmdline.h"
+#include "../commands.h"
+#include "../../util/mmap.h"
+#include "../../util/mongoutils/str.h"
+#include "health.h"
+#include "rs.h"
+#include "rs_config.h"
+#include "../dbhelpers.h"
+#include "../oplog.h"
+
+using namespace bson;
+using namespace mongoutils;
+
+namespace mongo {
+
+ /* called on a reconfig AND on initiate
+ throws
+ @param initial true when initiating
+ */
+ void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial) {
+ int failures = 0, allVotes = 0, allowableFailures = 0;
+ int me = 0;
+ stringstream selfs;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
+ if( i->h.isSelf() ) {
+ me++;
+ if( me > 1 )
+ selfs << ',';
+ selfs << i->h.toString();
+ if( !i->potentiallyHot() ) {
+ uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary");
+ }
+ }
+ allVotes += i->votes;
+ }
+ allowableFailures = allVotes - (allVotes/2 + 1);
+
+ uassert(13278, "bad config: isSelf is true for multiple hosts: " + selfs.str(), me <= 1); // dups?
+ if( me != 1 ) {
+ stringstream ss;
+ ss << "can't find self in the replset config";
+ if( !cmdLine.isDefaultPort() ) ss << " my port: " << cmdLine.port;
+ if( me != 0 ) ss << " found: " << me;
+ uasserted(13279, ss.str());
+ }
+
+ vector<string> down;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
+ // we know we're up
+ if (i->h.isSelf()) {
+ continue;
+ }
+
+ BSONObj res;
+ {
+ bool ok = false;
+ try {
+ int theirVersion = -1000;
+ ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/);
+ if( theirVersion >= cfg.version ) {
+ stringstream ss;
+ ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure";
+ uasserted(13259, ss.str());
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog;
+ }
+ catch(...) {
+ log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog;
+ }
+ if( res.getBoolField("mismatch") )
+ uasserted(13145, "set name does not match the set name host " + i->h.toString() + " expects");
+ if( *res.getStringField("set") ) {
+ if( cfg.version <= 1 ) {
+ // this was to be initiation, no one shoudl be initiated already.
+ uasserted(13256, "member " + i->h.toString() + " is already initiated");
+ }
+ else {
+ // Assure no one has a newer config.
+ if( res["v"].Int() >= cfg.version ) {
+ uasserted(13341, "member " + i->h.toString() + " has a config version >= to the new cfg version; cannot change config");
+ }
+ }
+ }
+ if( !ok && !res["rs"].trueValue() ) {
+ down.push_back(i->h.toString());
+
+ if( !res.isEmpty() ) {
+ /* strange. got a response, but not "ok". log it. */
+ log() << "replSet warning " << i->h.toString() << " replied: " << res.toString() << rsLog;
+ }
+
+ bool allowFailure = false;
+ failures += i->votes;
+ if( !initial && failures <= allowableFailures ) {
+ const Member* m = theReplSet->findById( i->_id );
+ if( m ) {
+ assert( m->h().toString() == i->h.toString() );
+ }
+ // it's okay if the down member isn't part of the config,
+ // we might be adding a new member that isn't up yet
+ allowFailure = true;
+ }
+
+ if( !allowFailure ) {
+ string msg = string("need all members up to initiate, not ok : ") + i->h.toStringLong();
+ if( !initial )
+ msg = string("need most members up to reconfigure, not ok : ") + i->h.toString();
+ uasserted(13144, msg);
+ }
+ }
+ }
+ if( initial ) {
+ bool hasData = res["hasData"].Bool();
+ uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.",
+ !hasData || i->h.isSelf());
+ }
+ }
+ if (down.size() > 0) {
+ result.append("down", down);
+ }
+ }
+
+ class CmdReplSetInitiate : public ReplSetCommand {
+ public:
+ virtual LockType locktype() const { return NONE; }
+ CmdReplSetInitiate() : ReplSetCommand("replSetInitiate") { }
+ virtual void help(stringstream& h) const {
+ h << "Initiate/christen a replica set.";
+ h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ log() << "replSet replSetInitiate admin command received from client" << rsLog;
+
+ if( !replSet ) {
+ errmsg = "server is not running with --replSet";
+ return false;
+ }
+ if( theReplSet ) {
+ errmsg = "already initialized";
+ result.append("info", "try querying " + rsConfigNs + " to see current configuration");
+ return false;
+ }
+
+ {
+ // just make sure we can get a write lock before doing anything else. we'll reacquire one
+ // later. of course it could be stuck then, but this check lowers the risk if weird things
+ // are up.
+ time_t t = time(0);
+ writelock lk("");
+ if( time(0)-t > 10 ) {
+ errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
+ return false;
+ }
+
+ /* check that we don't already have an oplog. that could cause issues.
+ it is ok if the initiating member has *other* data than that.
+ */
+ BSONObj o;
+ if( Helpers::getFirst(rsoplog, o) ) {
+ errmsg = rsoplog + string(" is not empty on the initiating member. cannot initiate.");
+ return false;
+ }
+ }
+
+ if( ReplSet::startupStatus == ReplSet::BADCONFIG ) {
+ errmsg = "server already in BADCONFIG state (check logs); not initiating";
+ result.append("info", ReplSet::startupStatusMsg.get());
+ return false;
+ }
+ if( ReplSet::startupStatus != ReplSet::EMPTYCONFIG ) {
+ result.append("startupStatus", ReplSet::startupStatus);
+ errmsg = "all members and seeds must be reachable to initiate set";
+ result.append("info", cmdLine._replSet);
+ return false;
+ }
+
+ BSONObj configObj;
+
+ if( cmdObj["replSetInitiate"].type() != Object ) {
+ result.append("info2", "no configuration explicitly specified -- making one");
+ log() << "replSet info initiate : no configuration specified. Using a default configuration for the set" << rsLog;
+
+ string name;
+ vector<HostAndPort> seeds;
+ set<HostAndPort> seedSet;
+ parseReplsetCmdLine(cmdLine._replSet, name, seeds, seedSet); // may throw...
+
+ bob b;
+ b.append("_id", name);
+ bob members;
+ members.append("0", BSON( "_id" << 0 << "host" << HostAndPort::Me().dynString() ));
+ result.append("me", HostAndPort::Me().toString());
+ for( unsigned i = 0; i < seeds.size(); i++ )
+ members.append(bob::numStr(i+1), BSON( "_id" << i+1 << "host" << seeds[i].toString()));
+ b.appendArray("members", members.obj());
+ configObj = b.obj();
+ log() << "replSet created this configuration for initiation : " << configObj.toString() << rsLog;
+ }
+ else {
+ configObj = cmdObj["replSetInitiate"].Obj();
+ }
+
+ bool parsed = false;
+ try {
+ ReplSetConfig newConfig(configObj);
+ parsed = true;
+
+ if( newConfig.version > 1 ) {
+ errmsg = "can't initiate with a version number greater than 1";
+ return false;
+ }
+
+ log() << "replSet replSetInitiate config object parses ok, " << newConfig.members.size() << " members specified" << rsLog;
+
+ checkMembersUpForConfigChange(newConfig, result, true);
+
+ log() << "replSet replSetInitiate all members seem up" << rsLog;
+
+ createOplog();
+
+ writelock lk("");
+ bo comment = BSON( "msg" << "initiating set");
+ newConfig.saveConfigLocally(comment);
+ log() << "replSet replSetInitiate config now saved locally. Should come online in about a minute." << rsLog;
+ result.append("info", "Config now saved locally. Should come online in about a minute.");
+ ReplSet::startupStatus = ReplSet::SOON;
+ ReplSet::startupStatusMsg.set("Received replSetInitiate - should come online shortly.");
+ }
+ catch( DBException& e ) {
+ log() << "replSet replSetInitiate exception: " << e.what() << rsLog;
+ if( !parsed )
+ errmsg = string("couldn't parse cfg object ") + e.what();
+ else
+ errmsg = string("couldn't initiate : ") + e.what();
+ return false;
+ }
+ catch( string& e2 ) {
+ log() << e2 << rsLog;
+ errmsg = e2;
+ return false;
+ }
+
+ return true;
+ }
+ } cmdReplSetInitiate;
+
+}
diff --git a/src/mongo/db/repl/rs_member.h b/src/mongo/db/repl/rs_member.h
new file mode 100644
index 00000000000..24e593392b6
--- /dev/null
+++ b/src/mongo/db/repl/rs_member.h
@@ -0,0 +1,131 @@
+// @file rsmember.h
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/** replica set member */
+
+#pragma once
+
+#include "../../util/concurrency/value.h"
+
+namespace mongo {
+
+
+ /*
+ RS_STARTUP serving still starting up, or still trying to initiate the set
+ RS_PRIMARY this server thinks it is primary
+ RS_SECONDARY this server thinks it is a secondary (slave mode)
+ RS_RECOVERING recovering/resyncing; after recovery usually auto-transitions to secondary
+ RS_FATAL something bad has occurred and server is not completely offline with regard to the replica set. fatal error.
+ RS_STARTUP2 loaded config, still determining who is primary
+ */
+ struct MemberState {
+ enum MS {
+ RS_STARTUP = 0,
+ RS_PRIMARY = 1,
+ RS_SECONDARY = 2,
+ RS_RECOVERING = 3,
+ RS_FATAL = 4,
+ RS_STARTUP2 = 5,
+ RS_UNKNOWN = 6, /* remote node not yet reached */
+ RS_ARBITER = 7,
+ RS_DOWN = 8, /* node not reachable for a report */
+ RS_ROLLBACK = 9
+ } s;
+
+ MemberState(MS ms = RS_UNKNOWN) : s(ms) { }
+ explicit MemberState(int ms) : s((MS) ms) { }
+
+ bool startup() const { return s == RS_STARTUP; }
+ bool primary() const { return s == RS_PRIMARY; }
+ bool secondary() const { return s == RS_SECONDARY; }
+ bool recovering() const { return s == RS_RECOVERING; }
+ bool startup2() const { return s == RS_STARTUP2; }
+ bool fatal() const { return s == RS_FATAL; }
+ bool rollback() const { return s == RS_ROLLBACK; }
+ bool readable() const { return s == RS_PRIMARY || s == RS_SECONDARY; }
+
+ string toString() const;
+
+ bool operator==(const MemberState& r) const { return s == r.s; }
+ bool operator!=(const MemberState& r) const { return s != r.s; }
+ };
+
+ /* this is supposed to be just basic information on a member,
+ and copy constructable. */
+ class HeartbeatInfo {
+ unsigned _id;
+ public:
+ HeartbeatInfo() : _id(0xffffffff), hbstate(MemberState::RS_UNKNOWN), health(-1.0),
+ downSince(0), skew(INT_MIN), authIssue(false), ping(0) { }
+ HeartbeatInfo(unsigned id);
+ unsigned id() const { return _id; }
+ MemberState hbstate;
+ double health;
+ time_t upSince;
+ long long downSince;
+ time_t lastHeartbeat;
+ DiagStr lastHeartbeatMsg;
+ OpTime opTime;
+ int skew;
+ bool authIssue;
+ unsigned int ping; // milliseconds
+ static unsigned int numPings;
+
+ bool up() const { return health > 0; }
+
+ /** health is set to -1 on startup. that means we haven't even checked yet. 0 means we checked and it failed. */
+ bool maybeUp() const { return health != 0; }
+
+ long long timeDown() const; // ms
+
+ /* true if changed in a way of interest to the repl set manager. */
+ bool changed(const HeartbeatInfo& old) const;
+ };
+
+ inline HeartbeatInfo::HeartbeatInfo(unsigned id) :
+ _id(id),
+ authIssue(false),
+ ping(0) {
+ hbstate = MemberState::RS_UNKNOWN;
+ health = -1.0;
+ downSince = 0;
+ lastHeartbeat = upSince = 0;
+ skew = INT_MIN;
+ }
+
+ inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
+ return health != old.health ||
+ hbstate != old.hbstate;
+ }
+
+ inline string MemberState::toString() const {
+ switch ( s ) {
+ case RS_STARTUP: return "STARTUP";
+ case RS_PRIMARY: return "PRIMARY";
+ case RS_SECONDARY: return "SECONDARY";
+ case RS_RECOVERING: return "RECOVERING";
+ case RS_FATAL: return "FATAL";
+ case RS_STARTUP2: return "STARTUP2";
+ case RS_ARBITER: return "ARBITER";
+ case RS_DOWN: return "DOWN";
+ case RS_ROLLBACK: return "ROLLBACK";
+ case RS_UNKNOWN: return "UNKNOWN";
+ }
+ return "";
+ }
+
+}
diff --git a/src/mongo/db/repl/rs_optime.h b/src/mongo/db/repl/rs_optime.h
new file mode 100644
index 00000000000..f0ca56927ad
--- /dev/null
+++ b/src/mongo/db/repl/rs_optime.h
@@ -0,0 +1,58 @@
+// @file rs_optime.h
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "../../util/optime.h"
+
+namespace mongo {
+
+ const char rsoplog[] = "local.oplog.rs";
+
+ /*
+ class RSOpTime : public OpTime {
+ public:
+ bool initiated() const { return getSecs() != 0; }
+ };*/
+
+ /*struct RSOpTime {
+ unsigned long long ord;
+
+ RSOpTime() : ord(0) { }
+
+ bool initiated() const { return ord > 0; }
+
+ void initiate() {
+ assert( !initiated() );
+ ord = 1000000;
+ }
+
+ ReplTime inc() {
+ DEV assertInWriteLock();
+ return ++ord;
+ }
+
+ string toString() const { return str::stream() << ord; }
+
+ // query the oplog and set the highest value herein. acquires a db read lock. throws.
+ void load();
+ };
+
+ extern RSOpTime rsOpTime;*/
+
+}
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
new file mode 100644
index 00000000000..10727c59669
--- /dev/null
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -0,0 +1,667 @@
+/* @file rs_rollback.cpp
+*
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "rs.h"
+#include "../repl.h"
+#include "../ops/query.h"
+#include "../cloner.h"
+#include "../ops/update.h"
+#include "../ops/delete.h"
+
+/* Scenarios
+
+ We went offline with ops not replicated out.
+
+ F = node that failed and coming back.
+ P = node that took over, new primary
+
+ #1:
+ F : a b c d e f g
+ P : a b c d q
+
+ The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P
+ will have significantly more data. Also note that P may have a proper subset of F's stream if there were
+ no subsequent writes.
+
+ For now the model is simply : get F back in sync with P. If P was really behind or something, we should have
+ just chosen not to fail over anyway.
+
+ #2:
+ F : a b c d e f g -> a b c d
+ P : a b c d
+
+ #3:
+ F : a b c d e f g -> a b c d q r s t u v w x z
+ P : a b c d.q r s t u v w x z
+
+ Steps
+ find an event in common. 'd'.
+ undo our events beyond that by:
+ (1) taking copy from other server of those objects
+ (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object
+ -- i.e., reset minvalid.
+ (3) we could skip operations on objects that are previous in time to our capture of the object as an optimization.
+
+*/
+
+namespace mongo {
+
+ using namespace bson;
+
+ void incRBID();
+
+ class rsfatal : public std::exception {
+ public:
+ virtual const char* what() const throw() { return "replica set fatal exception"; }
+ };
+
+ struct DocID {
+ const char *ns;
+ be _id;
+ bool operator<(const DocID& d) const {
+ int c = strcmp(ns, d.ns);
+ if( c < 0 ) return true;
+ if( c > 0 ) return false;
+ return _id < d._id;
+ }
+ };
+
+ struct HowToFixUp {
+ /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only
+ need to refetch it once. */
+ set<DocID> toRefetch;
+
+ /* collections to drop */
+ set<string> toDrop;
+
+ set<string> collectionsToResync;
+
+ OpTime commonPoint;
+ DiskLoc commonPointOurDiskloc;
+
+ int rbid; // remote server's current rollback sequence #
+ };
+
+ static void refetch(HowToFixUp& h, const BSONObj& ourObj) {
+ const char *op = ourObj.getStringField("op");
+ if( *op == 'n' )
+ return;
+
+ unsigned long long totSize = 0;
+ totSize += ourObj.objsize();
+ if( totSize > 512 * 1024 * 1024 )
+ throw "rollback too large";
+
+ DocID d;
+ // NOTE The assigned ns value may become invalid if we yield.
+ d.ns = ourObj.getStringField("ns");
+ if( *d.ns == 0 ) {
+ log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog;
+ return;
+ }
+
+ bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o");
+ if( o.isEmpty() ) {
+ log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog;
+ return;
+ }
+
+ if( *op == 'c' ) {
+ be first = o.firstElement();
+ NamespaceString s(d.ns); // foo.$cmd
+ string cmdname = first.fieldName();
+ Command *cmd = Command::findCommand(cmdname.c_str());
+ if( cmd == 0 ) {
+ log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog;
+ return;
+ }
+ else {
+ /* findandmodify - tranlated?
+ godinsert?,
+ renamecollection a->b. just resync a & b
+ */
+ if( cmdname == "create" ) {
+ /* Create collection operation
+ { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */
+ string ns = s.db + '.' + o["create"].String(); // -> foo.abc
+ h.toDrop.insert(ns);
+ return;
+ }
+ else if( cmdname == "drop" ) {
+ string ns = s.db + '.' + first.valuestr();
+ h.collectionsToResync.insert(ns);
+ return;
+ }
+ else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) {
+ /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */
+ log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog;
+ string ns = s.db + '.' + first.valuestr();
+ h.collectionsToResync.insert(ns);
+ return;
+ }
+ else if( cmdname == "renameCollection" ) {
+ /* TODO: slow. */
+ log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog;
+ string from = first.valuestr();
+ string to = o["to"].String();
+ h.collectionsToResync.insert(from);
+ h.collectionsToResync.insert(to);
+ return;
+ }
+ else if( cmdname == "reIndex" ) {
+ return;
+ }
+ else if( cmdname == "dropDatabase" ) {
+ log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog;
+ log() << "replSet " << o.toString() << rsLog;
+ throw rsfatal();
+ }
+ else {
+ log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog;
+ log() << "replSet cmdname=" << cmdname << rsLog;
+ throw rsfatal();
+ }
+ }
+ }
+
+ d._id = o["_id"];
+ if( d._id.eoo() ) {
+ log() << "replSet WARNING ignoring op on rollback no _id TODO : " << d.ns << ' '<< ourObj.toString() << rsLog;
+ return;
+ }
+
+ h.toRefetch.insert(d);
+ }
+
+ int getRBID(DBClientConnection*);
+
+ static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) {
+ static time_t last;
+ if( time(0)-last < 60 ) {
+ throw "findcommonpoint waiting a while before trying again";
+ }
+ last = time(0);
+
+ assert( d.dbMutex.atLeastReadLocked() );
+ Client::Context c(rsoplog);
+ NamespaceDetails *nsd = nsdetails(rsoplog);
+ assert(nsd);
+ ReverseCappedCursor u(nsd);
+ if( !u.ok() )
+ throw "our oplog empty or unreadable";
+
+ const Query q = Query().sort(reverseNaturalObj);
+ const bo fields = BSON( "ts" << 1 << "h" << 1 );
+
+ //auto_ptr<DBClientCursor> u = us->query(rsoplog, q, 0, 0, &fields, 0, 0);
+
+ h.rbid = getRBID(them);
+ auto_ptr<DBClientCursor> t = them->query(rsoplog, q, 0, 0, &fields, 0, 0);
+
+ if( t.get() == 0 || !t->more() ) throw "remote oplog empty or unreadable";
+
+ BSONObj ourObj = u.current();
+ OpTime ourTime = ourObj["ts"]._opTime();
+ BSONObj theirObj = t->nextSafe();
+ OpTime theirTime = theirObj["ts"]._opTime();
+
+ {
+ long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs());
+ /* diff could be positive, negative, or zero */
+ log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog;
+ log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog;
+ log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog;
+ if( diff > 1800 ) {
+ log() << "replSet rollback too long a time period for a rollback." << rsLog;
+ throw "error not willing to roll back more than 30 minutes of data";
+ }
+ }
+
+ unsigned long long scanned = 0;
+ while( 1 ) {
+ scanned++;
+ /* todo add code to assure no excessive scanning for too long */
+ if( ourTime == theirTime ) {
+ if( ourObj["h"].Long() == theirObj["h"].Long() ) {
+ // found the point back in time where we match.
+ // todo : check a few more just to be careful about hash collisions.
+ log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog;
+ log() << "replSet rollback findcommonpoint scanned : " << scanned << rsLog;
+ h.commonPoint = ourTime;
+ h.commonPointOurDiskloc = u.currLoc();
+ return;
+ }
+
+ refetch(h, ourObj);
+
+ if( !t->more() ) {
+ log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS100 reached beginning of remote oplog [2]";
+ }
+ theirObj = t->nextSafe();
+ theirTime = theirObj["ts"]._opTime();
+
+ u.advance();
+ if( !u.ok() ) {
+ log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS101 reached beginning of local oplog [1]";
+ }
+ ourObj = u.current();
+ ourTime = ourObj["ts"]._opTime();
+ }
+ else if( theirTime > ourTime ) {
+ if( !t->more() ) {
+ log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS100 reached beginning of remote oplog [1]";
+ }
+ theirObj = t->nextSafe();
+ theirTime = theirObj["ts"]._opTime();
+ }
+ else {
+ // theirTime < ourTime
+ refetch(h, ourObj);
+ u.advance();
+ if( !u.ok() ) {
+ log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS101 reached beginning of local oplog [2]";
+ }
+ ourObj = u.current();
+ ourTime = ourObj["ts"]._opTime();
+ }
+ }
+ }
+
+ struct X {
+ const bson::bo *op;
+ bson::bo goodVersionOfObject;
+ };
+
+ static void setMinValid(bo newMinValid) {
+ try {
+ log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog;
+ }
+ catch(...) { }
+ {
+ Helpers::putSingleton("local.replset.minvalid", newMinValid);
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ }
+ }
+
+ void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
+ DBClientConnection *them = r.conn();
+
+ // fetch all first so we needn't handle interruption in a fancy way
+
+ unsigned long long totSize = 0;
+
+ list< pair<DocID,bo> > goodVersions;
+
+ bo newMinValid;
+
+ /* fetch all the goodVersions of each document from current primary */
+ DocID d;
+ unsigned long long n = 0;
+ try {
+ for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) {
+ d = *i;
+
+ assert( !d._id.eoo() );
+
+ {
+ /* TODO : slow. lots of round trips. */
+ n++;
+ bo good= them->findOne(d.ns, d._id.wrap(), NULL, QueryOption_SlaveOk).getOwned();
+ totSize += good.objsize();
+ uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 );
+
+ // note good might be eoo, indicating we should delete it
+ goodVersions.push_back(pair<DocID,bo>(d,good));
+ }
+ }
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ sethbmsg("rollback error newMinValid empty?");
+ return;
+ }
+ }
+ catch(DBException& e) {
+ sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0);
+ log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
+ throw e;
+ }
+
+ MemoryMappedFile::flushAll(true);
+
+ sethbmsg("rollback 3.5");
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt");
+ return;
+ }
+
+ // update them
+ sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size());
+
+ bool warn = false;
+
+ assert( !h.commonPointOurDiskloc.isNull() );
+
+ mongo::d.dbMutex.assertWriteLocked();
+
+ /* we have items we are writing that aren't from a point-in-time. thus best not to come online
+ until we get to that point in freshness. */
+ setMinValid(newMinValid);
+
+ /** any full collection resyncs required? */
+ if( !h.collectionsToResync.empty() ) {
+ for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) {
+ string ns = *i;
+ sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns);
+
+ Client::Context c(ns);
+ {
+ bob res;
+ string errmsg;
+ dropCollection(ns, errmsg, res);
+ {
+ dbtemprelease r;
+ bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, errmsg);
+ uassert(15909, str::stream() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg, ok);
+ }
+ }
+ }
+
+ /* we did more reading from primary, so check it again for a rollback (which would mess us up), and
+ make minValid newer.
+ */
+ sethbmsg("rollback 4.2");
+ {
+ string err;
+ try {
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ err = "can't get minvalid from primary";
+ }
+ else {
+ setMinValid(newMinValid);
+ }
+ }
+ catch (DBException&) {
+ err = "can't get/set minvalid";
+ }
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ // however, we've now done writes. thus we have a problem.
+ err += "rbid at primary changed during resync/rollback";
+ }
+ if( !err.empty() ) {
+ log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog;
+ /* todo: reset minvalid so that we are permanently in fatal state */
+ /* todo: don't be fatal, but rather, get all the data first. */
+ sethbmsg("rollback error");
+ throw rsfatal();
+ }
+ }
+ sethbmsg("rollback 4.3");
+ }
+
+ sethbmsg("rollback 4.6");
+ /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */
+ for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) {
+ Client::Context c(*i);
+ try {
+ bob res;
+ string errmsg;
+ log(1) << "replSet rollback drop: " << *i << rsLog;
+ dropCollection(*i, errmsg, res);
+ }
+ catch(...) {
+ log() << "replset rollback error dropping collection " << *i << rsLog;
+ }
+ }
+
+ sethbmsg("rollback 4.7");
+ Client::Context c(rsoplog);
+ NamespaceDetails *oplogDetails = nsdetails(rsoplog);
+ uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
+
+ map<string,shared_ptr<RemoveSaver> > removeSavers;
+
+ unsigned deletes = 0, updates = 0;
+ for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) {
+ const DocID& d = i->first;
+ bo pattern = d._id.wrap(); // { _id : ... }
+ try {
+ assert( d.ns && *d.ns );
+ if( h.collectionsToResync.count(d.ns) ) {
+ /* we just synced this entire collection */
+ continue;
+ }
+
+ getDur().commitIfNeeded();
+
+ /* keep an archive of items rolled back */
+ shared_ptr<RemoveSaver>& rs = removeSavers[d.ns];
+ if ( ! rs )
+ rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) );
+
+ // todo: lots of overhead in context, this can be faster
+ Client::Context c(d.ns);
+ if( i->second.isEmpty() ) {
+ // wasn't on the primary; delete.
+ /* TODO1.6 : can't delete from a capped collection. need to handle that here. */
+ deletes++;
+
+ NamespaceDetails *nsd = nsdetails(d.ns);
+ if( nsd ) {
+ if( nsd->capped ) {
+ /* can't delete from a capped collection - so we truncate instead. if this item must go,
+ so must all successors!!! */
+ try {
+ /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */
+ // this will crazy slow if no _id index.
+ long long start = Listener::getElapsedTimeMillis();
+ DiskLoc loc = Helpers::findOne(d.ns, pattern, false);
+ if( Listener::getElapsedTimeMillis() - start > 200 )
+ log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog;
+ //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern);
+ if( !loc.isNull() ) {
+ try {
+ nsd->cappedTruncateAfter(d.ns, loc, true);
+ }
+ catch(DBException& e) {
+ if( e.getCode() == 13415 ) {
+ // hack: need to just make cappedTruncate do this...
+ nsd->emptyCappedCollection(d.ns);
+ }
+ else {
+ throw;
+ }
+ }
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog;
+ }
+ }
+ else {
+ try {
+ deletes++;
+ deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() );
+ }
+ catch(...) {
+ log() << "replSet error rollback delete failed ns:" << d.ns << rsLog;
+ }
+ }
+ // did we just empty the collection? if so let's check if it even exists on the source.
+ if( nsd->stats.nrecords == 0 ) {
+ try {
+ string sys = cc().database()->name + ".system.namespaces";
+ bo o = them->findOne(sys, QUERY("name"<<d.ns));
+ if( o.isEmpty() ) {
+ // we should drop
+ try {
+ bob res;
+ string errmsg;
+ dropCollection(d.ns, errmsg, res);
+ }
+ catch(...) {
+ log() << "replset error rolling back collection " << d.ns << rsLog;
+ }
+ }
+ }
+ catch(DBException& ) {
+ /* this isn't *that* big a deal, but is bad. */
+ log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog;
+ }
+ }
+ }
+ }
+ else {
+ // todo faster...
+ OpDebug debug;
+ updates++;
+ _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() );
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog;
+ warn = true;
+ }
+ }
+
+ removeSavers.clear(); // this effectively closes all of them
+
+ sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates);
+ MemoryMappedFile::flushAll(true);
+ sethbmsg("rollback 6");
+
+ // clean up oplog
+ LOG(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog;
+ // todo: fatal error if this throws?
+ oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false);
+
+ /* reset cached lastoptimewritten and h value */
+ loadLastOpTimeWritten();
+
+ sethbmsg("rollback 7");
+ MemoryMappedFile::flushAll(true);
+
+ // done
+ if( warn )
+ sethbmsg("issues during syncRollback, see log");
+ else
+ sethbmsg("rollback done");
+ }
+
+ void ReplSetImpl::syncRollback(OplogReader&r) {
+ unsigned s = _syncRollback(r);
+ if( s )
+ sleepsecs(s);
+ }
+
+ unsigned ReplSetImpl::_syncRollback(OplogReader&r) {
+ assert( !lockedByMe() );
+ assert( !d.dbMutex.atLeastReadLocked() );
+
+ sethbmsg("rollback 0");
+
+ writelocktry lk(rsoplog, 20000);
+ if( !lk.got() ) {
+ sethbmsg("rollback couldn't get write lock in a reasonable time");
+ return 2;
+ }
+
+ if( state().secondary() ) {
+ /* by doing this, we will not service reads (return an error as we aren't in secondary staate.
+ that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred
+ or removed or yielded later anyway.
+
+ also, this is better for status reporting - we know what is happening.
+ */
+ changeState(MemberState::RS_ROLLBACK);
+ }
+
+ HowToFixUp how;
+ sethbmsg("rollback 1");
+ {
+ r.resetCursor();
+
+ sethbmsg("rollback 2 FindCommonPoint");
+ try {
+ syncRollbackFindCommonPoint(r.conn(), how);
+ }
+ catch( const char *p ) {
+ sethbmsg(string("rollback 2 error ") + p);
+ return 10;
+ }
+ catch( rsfatal& ) {
+ _fatal();
+ return 2;
+ }
+ catch( DBException& e ) {
+ sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min");
+ dbtemprelease r;
+ sleepsecs(60);
+ throw;
+ }
+ }
+
+ sethbmsg("replSet rollback 3 fixup");
+
+ {
+ incRBID();
+ try {
+ syncFixUp(how, r);
+ }
+ catch( rsfatal& ) {
+ sethbmsg("rollback fixup error");
+ _fatal();
+ return 2;
+ }
+ catch(...) {
+ incRBID(); throw;
+ }
+ incRBID();
+
+ /* success - leave "ROLLBACK" state
+ can go to SECONDARY once minvalid is achieved
+ */
+ changeState(MemberState::RS_RECOVERING);
+ }
+
+ return 0;
+ }
+
+}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
new file mode 100644
index 00000000000..8bac981d951
--- /dev/null
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -0,0 +1,701 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "rs.h"
+#include "../repl.h"
+#include "connections.h"
+
+namespace mongo {
+
+ using namespace bson;
+ extern unsigned replSetForceInitialSyncFailure;
+
+ void NOINLINE_DECL blank(const BSONObj& o) {
+ if( *o.getStringField("op") != 'n' ) {
+ log() << "replSet skipping bad op in oplog: " << o.toString() << rsLog;
+ }
+ }
+
+ /* apply the log op that is in param o
+ @return bool success (true) or failure (false)
+ */
+ bool replset::SyncTail::syncApply(const BSONObj &o) {
+ const char *ns = o.getStringField("ns");
+ if ( *ns == '.' || *ns == 0 ) {
+ blank(o);
+ return true;
+ }
+
+ Client::Context ctx(ns);
+ ctx.getClient()->curop()->reset();
+ return !applyOperation_inlock(o);
+ }
+
+ /* initial oplog application, during initial sync, after cloning.
+ @return false on failure.
+ this method returns an error and doesn't throw exceptions (i think).
+ */
+ bool ReplSetImpl::initialSyncOplogApplication(const OpTime& applyGTE, const OpTime& minValid) {
+ Member *source = 0;
+ OplogReader r;
+
+ // keep trying to initial sync from oplog until we run out of targets
+ while ((source = _getOplogReader(r, applyGTE)) != 0) {
+ replset::InitialSync init(source->fullName());
+ if (init.oplogApplication(r, source, applyGTE, minValid)) {
+ return true;
+ }
+
+ r.resetConnection();
+ veto(source->fullName(), 60);
+ log() << "replSet applying oplog from " << source->fullName() << " failed, trying again" << endl;
+ }
+
+ log() << "replSet initial sync error: couldn't find oplog to sync from" << rsLog;
+ return false;
+ }
+
+ bool replset::InitialSync::oplogApplication(OplogReader& r, const Member* source,
+ const OpTime& applyGTE, const OpTime& minValid) {
+
+ const string hn = source->fullName();
+ try {
+ r.tailingQueryGTE( rsoplog, applyGTE );
+ if ( !r.haveCursor() ) {
+ log() << "replSet initial sync oplog query error" << rsLog;
+ return false;
+ }
+
+ {
+ if( !r.more() ) {
+ sethbmsg("replSet initial sync error reading remote oplog");
+ log() << "replSet initial sync error remote oplog (" << rsoplog << ") on host " << hn << " is empty?" << rsLog;
+ return false;
+ }
+ bo op = r.next();
+ OpTime t = op["ts"]._opTime();
+ r.putBack(op);
+
+ if( op.firstElementFieldName() == string("$err") ) {
+ log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog;
+ return false;
+ }
+
+ uassert( 13508 , str::stream() << "no 'ts' in first op in oplog: " << op , !t.isNull() );
+ if( t > applyGTE ) {
+ sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync");
+ log() << "replSet initial sync expected first optime of " << applyGTE << rsLog;
+ log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog;
+ return false;
+ }
+
+ sethbmsg(str::stream() << "initial oplog application from " << hn << " starting at "
+ << t.toStringPretty() << " to " << minValid.toStringPretty());
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet initial sync failing: " << e.toString() << rsLog;
+ return false;
+ }
+
+ /* we lock outside the loop to avoid the overhead of locking on every operation. */
+ writelock lk("");
+
+ // todo : use exhaust
+ OpTime ts;
+ time_t start = time(0);
+ unsigned long long n = 0;
+ int fails = 0;
+ while( ts < minValid ) {
+ try {
+ // There are some special cases with initial sync (see the catch block), so we
+ // don't want to break out of this while until we've reached minvalid. Thus, we'll
+ // keep trying to requery.
+ if( !r.more() ) {
+ OCCASIONALLY log() << "replSet initial sync oplog: no more records" << endl;
+ sleepsecs(1);
+
+ r.resetCursor();
+ r.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten);
+ if ( !r.haveCursor() ) {
+ if (fails++ > 30) {
+ log() << "replSet initial sync tried to query oplog 30 times, giving up" << endl;
+ return false;
+ }
+ }
+
+ continue;
+ }
+
+ BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
+ ts = o["ts"]._opTime();
+
+ {
+ if( (source->state() != MemberState::RS_PRIMARY &&
+ source->state() != MemberState::RS_SECONDARY) ||
+ replSetForceInitialSyncFailure ) {
+
+ int f = replSetForceInitialSyncFailure;
+ if( f > 0 ) {
+ replSetForceInitialSyncFailure = f-1;
+ log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog;
+ throw DBException("forced error",0);
+ }
+ log() << "replSet we are now primary" << rsLog;
+ throw DBException("primary changed",0);
+ }
+
+ applyOp(o, applyGTE);
+ }
+
+ if ( ++n % 1000 == 0 ) {
+ time_t now = time(0);
+ if (now - start > 10) {
+ // simple progress metering
+ log() << "replSet initialSyncOplogApplication applied " << n << " operations, synced to "
+ << ts.toStringPretty() << rsLog;
+ start = now;
+ }
+ }
+
+ getDur().commitIfNeeded();
+ }
+ catch (DBException& e) {
+ // Skip duplicate key exceptions.
+ // These are relatively common on initial sync: if a document is inserted
+ // early in the clone step, the insert will be replayed but the document
+ // will probably already have been cloned over.
+ if( e.getCode() == 11000 || e.getCode() == 11001 || e.getCode() == 12582) {
+ continue;
+ }
+
+ // handle cursor not found (just requery)
+ if( e.getCode() == 13127 ) {
+ log() << "replSet requerying oplog after cursor not found condition, ts: " << ts.toStringPretty() << endl;
+ r.resetCursor();
+ r.tailingQueryGTE(rsoplog, ts);
+ if( r.haveCursor() ) {
+ continue;
+ }
+ }
+
+ // TODO: handle server restart
+
+ if( ts <= minValid ) {
+ // didn't make it far enough
+ log() << "replSet initial sync failing, error applying oplog : " << e.toString() << rsLog;
+ return false;
+ }
+
+ // otherwise, whatever, we'll break out of the loop and catch
+ // anything that's really wrong in syncTail
+ }
+ }
+ return true;
+ }
+
+ void replset::InitialSync::applyOp(const BSONObj& o, const OpTime& applyGTE) {
+ OpTime ts = o["ts"]._opTime();
+
+ // optimes before we started copying need not be applied.
+ if( ts >= applyGTE ) {
+ if (!syncApply(o)) {
+ if (shouldRetry(o)) {
+ uassert(15915, "replSet update still fails after adding missing object", syncApply(o));
+ }
+ }
+ }
+
+ // with repl sets we write the ops to our oplog, too
+ _logOpObjRS(o);
+ }
+
+ /* should be in RECOVERING state on arrival here.
+ readlocks
+ @return true if transitioned to SECONDARY
+ */
+ bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
+ bool golive = false;
+
+ {
+ lock lk( this );
+
+ if (_maintenanceMode > 0) {
+ // we're not actually going live
+ return true;
+ }
+ }
+
+ {
+ readlock lk("local.replset.minvalid");
+ BSONObj mv;
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ minvalid = mv["ts"]._opTime();
+ if( minvalid <= lastOpTimeWritten ) {
+ golive=true;
+ }
+ }
+ else
+ golive = true; /* must have been the original member */
+ }
+ if( golive ) {
+ sethbmsg("");
+ changeState(MemberState::RS_SECONDARY);
+ }
+ return golive;
+ }
+
+ bool ReplSetImpl::_isStale(OplogReader& r, const OpTime& startTs, BSONObj& remoteOldestOp) {
+ remoteOldestOp = r.findOne(rsoplog, Query());
+ OpTime remoteTs = remoteOldestOp["ts"]._opTime();
+ DEV log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
+ else LOG(3) << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
+ DEV {
+ log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet our state: " << state().toString() << rsLog;
+ }
+ if( startTs >= remoteTs ) {
+ return false;
+ }
+
+ return true;
+ }
+
+ Member* ReplSetImpl::_getOplogReader(OplogReader& r, const OpTime& minTS) {
+ Member *target = 0, *stale = 0;
+ BSONObj oldest;
+
+ assert(r.conn() == 0);
+
+ while ((target = getMemberToSyncTo()) != 0) {
+ string current = target->fullName();
+
+ if( !r.connect(current) ) {
+ log(2) << "replSet can't connect to " << current << " to read operations" << rsLog;
+ r.resetConnection();
+ veto(current);
+ continue;
+ }
+
+ if( !minTS.isNull() && _isStale(r, minTS, oldest) ) {
+ r.resetConnection();
+ veto(current, 600);
+ stale = target;
+ continue;
+ }
+
+ // if we made it here, the target is up and not stale
+ return target;
+ }
+
+ // the only viable sync target was stale
+ if (stale) {
+ log() << "replSet error RS102 too stale to catch up, at least from " << stale->fullName() << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet oldest at " << stale->fullName() << " : " << oldest["ts"]._opTime().toStringLong() << rsLog;
+ log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
+
+ // reset minvalid so that we can't become primary prematurely
+ {
+ writelock lk("local.replset.minvalid");
+ Helpers::putSingleton("local.replset.minvalid", oldest);
+ }
+
+ sethbmsg("error RS102 too stale to catch up");
+ changeState(MemberState::RS_RECOVERING);
+ sleepsecs(120);
+ }
+
+ return 0;
+ }
+
+ /* tail an oplog. ok to return, will be re-called. */
+ void ReplSetImpl::syncTail() {
+ // todo : locking vis a vis the mgr...
+ OplogReader r;
+ string hn;
+
+ // find a target to sync from the last op time written
+ Member* target = _getOplogReader(r, lastOpTimeWritten);
+
+ // no server found
+ if (target == 0) {
+ // if there is no one to sync from
+ OpTime minvalid;
+ tryToGoLiveAsASecondary(minvalid);
+ return;
+ }
+
+ r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
+ // if target cut connections between connecting and querying (for
+ // example, because it stepped down) we might not have a cursor
+ if ( !r.haveCursor() ) {
+ return;
+ }
+
+ uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() );
+
+ {
+ if( !r.more() ) {
+ /* maybe we are ahead and need to roll back? */
+ try {
+ bo theirLastOp = r.getLastOp(rsoplog);
+ if( theirLastOp.isEmpty() ) {
+ log() << "replSet error empty query result from " << hn << " oplog" << rsLog;
+ sleepsecs(2);
+ return;
+ }
+ OpTime theirTS = theirLastOp["ts"]._opTime();
+ if( theirTS < lastOpTimeWritten ) {
+ log() << "replSet we are ahead of the primary, will try to roll back" << rsLog;
+ syncRollback(r);
+ return;
+ }
+ /* we're not ahead? maybe our new query got fresher data. best to come back and try again */
+ log() << "replSet syncTail condition 1" << rsLog;
+ sleepsecs(1);
+ }
+ catch(DBException& e) {
+ log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog;
+ veto(target->fullName());
+ sleepsecs(2);
+ }
+ return;
+ }
+
+ BSONObj o = r.nextSafe();
+ OpTime ts = o["ts"]._opTime();
+ long long h = o["h"].numberLong();
+ if( ts != lastOpTimeWritten || h != lastH ) {
+ log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << rsLog;
+ log() << "replset source's GTE: " << ts.toStringPretty() << rsLog;
+ syncRollback(r);
+ return;
+ }
+ }
+
+ /* we have now checked if we need to rollback and we either don't have to or did it. */
+ {
+ OpTime minvalid;
+ tryToGoLiveAsASecondary(minvalid);
+ }
+
+ while( 1 ) {
+ {
+ Timer timeInWriteLock;
+ writelock lk("");
+ while( 1 ) {
+ if( !r.moreInCurrentBatch() ) {
+ dbtemprelease tempRelease;
+ {
+ // we need to occasionally check some things. between
+ // batches is probably a good time.
+ if( state().recovering() ) { // perhaps we should check this earlier? but not before the rollback checks.
+ /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */
+ OpTime minvalid;
+ bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid);
+ if( golive ) {
+ ;
+ }
+ else {
+ sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString());
+ }
+ // todo: too stale capability
+ }
+ if( !target->hbinfo().hbstate.readable() ) {
+ return;
+ }
+ }
+ r.more(); // to make the requestmore outside the db lock, which obviously is quite important
+ }
+ if( timeInWriteLock.micros() > 1000 ) {
+ dbtemprelease tempRelease;
+ timeInWriteLock.reset();
+ }
+ if( !r.more() )
+ break;
+ {
+ BSONObj o = r.nextSafe(); // note we might get "not master" at some point
+
+ int sd = myConfig().slaveDelay;
+ // ignore slaveDelay if the box is still initializing. once
+ // it becomes secondary we can worry about it.
+ if( sd && box.getState().secondary() ) {
+ const OpTime ts = o["ts"]._opTime();
+ long long a = ts.getSecs();
+ long long b = time(0);
+ long long lag = b - a;
+ long long sleeptime = sd - lag;
+ if( sleeptime > 0 ) {
+ dbtemprelease tempRelease;
+ uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000);
+ if( sleeptime < 60 ) {
+ sleepsecs((int) sleeptime);
+ }
+ else {
+ log() << "replSet slavedelay sleep long time: " << sleeptime << rsLog;
+ // sleep(hours) would prevent reconfigs from taking effect & such!
+ long long waitUntil = b + sleeptime;
+ while( 1 ) {
+ sleepsecs(6);
+ if( time(0) >= waitUntil )
+ break;
+
+ if( !target->hbinfo().hbstate.readable() ) {
+ break;
+ }
+
+ if( myConfig().slaveDelay != sd ) // reconf
+ break;
+ }
+ }
+ }
+ } // endif slaveDelay
+
+ d.dbMutex.assertWriteLocked();
+ try {
+ /* if we have become primary, we dont' want to apply things from elsewhere
+ anymore. assumePrimary is in the db lock so we are safe as long as
+ we check after we locked above. */
+ if( box.getState().primary() ) {
+ log(0) << "replSet stopping syncTail we are now primary" << rsLog;
+ return;
+ }
+
+ // TODO: make this whole method a member of SyncTail (SERVER-4444)
+ replset::SyncTail tail("");
+ tail.syncApply(o);
+ _logOpObjRS(o); // with repl sets we write the ops to our oplog too
+ }
+ catch (DBException& e) {
+ sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o);
+ veto(target->fullName(), 300);
+ sleepsecs(30);
+ return;
+ }
+ }
+ } // end while
+ } // end writelock scope
+
+ r.tailCheck();
+ if( !r.haveCursor() ) {
+ LOG(1) << "replSet end syncTail pass with " << hn << rsLog;
+ // TODO : reuse our connection to the primary.
+ return;
+ }
+
+ if( !target->hbinfo().hbstate.readable() ) {
+ return;
+ }
+ // looping back is ok because this is a tailable cursor
+ }
+ }
+
+ void ReplSetImpl::_syncThread() {
+ StateBox::SP sp = box.get();
+ if( sp.state.primary() ) {
+ sleepsecs(1);
+ return;
+ }
+ if( _blockSync || sp.state.fatal() || sp.state.startup() ) {
+ sleepsecs(5);
+ return;
+ }
+
+ /* do we have anything at all? */
+ if( lastOpTimeWritten.isNull() ) {
+ syncDoInitialSync();
+ return; // _syncThread will be recalled, starts from top again in case sync failed.
+ }
+
+ /* we have some data. continue tailing. */
+ syncTail();
+ }
+
+ void ReplSetImpl::syncThread() {
+ while( 1 ) {
+ // After a reconfig, we may not be in the replica set anymore, so
+ // check that we are in the set (and not an arbiter) before
+ // trying to sync with other replicas.
+ if( ! _self ) {
+ log() << "replSet warning did not detect own host and port, not syncing, config: " << theReplSet->config() << rsLog;
+ return;
+ }
+ if( myConfig().arbiterOnly ) {
+ return;
+ }
+
+ try {
+ _syncThread();
+ }
+ catch(DBException& e) {
+ sethbmsg(str::stream() << "syncThread: " << e.toString());
+ sleepsecs(10);
+ }
+ catch(...) {
+ sethbmsg("unexpected exception in syncThread()");
+ // TODO : SET NOT SECONDARY here?
+ sleepsecs(60);
+ }
+ sleepsecs(1);
+
+ /* normally msgCheckNewState gets called periodically, but in a single node repl set there
+ are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton
+ member has done a stepDown() and needs to come back up.
+ */
+ OCCASIONALLY {
+ mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+ }
+
+ void startSyncThread() {
+ static int n;
+ if( n != 0 ) {
+ log() << "replSet ERROR : more than one sync thread?" << rsLog;
+ assert( n == 0 );
+ }
+ n++;
+
+ Client::initThread("rsSync");
+ cc().iAmSyncThread(); // for isSyncThread() (which is used not used much, is used in secondary create index code
+ replLocalAuth();
+ theReplSet->syncThread();
+ cc().shutdown();
+ }
+
+ void GhostSync::starting() {
+ Client::initThread("rsGhostSync");
+ replLocalAuth();
+ }
+
+ void ReplSetImpl::blockSync(bool block) {
+ _blockSync = block;
+ if (_blockSync) {
+ // syncing is how we get into SECONDARY state, so we'll be stuck in
+ // RECOVERING until we unblock
+ changeState(MemberState::RS_RECOVERING);
+ }
+ }
+
+ void GhostSync::associateSlave(const BSONObj& id, const int memberId) {
+ const OID rid = id["_id"].OID();
+ rwlock lk( _lock , true );
+ shared_ptr<GhostSlave> &g = _ghostCache[rid];
+ if( g.get() == 0 ) {
+ g.reset( new GhostSlave() );
+ wassert( _ghostCache.size() < 10000 );
+ }
+ GhostSlave &slave = *g;
+ if (slave.init) {
+ LOG(1) << "tracking " << slave.slave->h().toString() << " as " << rid << rsLog;
+ return;
+ }
+
+ slave.slave = (Member*)rs->findById(memberId);
+ if (slave.slave != 0) {
+ slave.init = true;
+ }
+ else {
+ log() << "replset couldn't find a slave with id " << memberId
+ << ", not tracking " << rid << rsLog;
+ }
+ }
+
+ void GhostSync::updateSlave(const mongo::OID& rid, const OpTime& last) {
+ rwlock lk( _lock , false );
+ MAP::iterator i = _ghostCache.find( rid );
+ if ( i == _ghostCache.end() ) {
+ OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog;
+ return;
+ }
+
+ GhostSlave& slave = *(i->second);
+ if (!slave.init) {
+ OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
+ return;
+ }
+
+ ((ReplSetConfig::MemberCfg)slave.slave->config()).updateGroups(last);
+ }
+
+ void GhostSync::percolate(const BSONObj& id, const OpTime& last) {
+ const OID rid = id["_id"].OID();
+ GhostSlave* slave;
+ {
+ rwlock lk( _lock , false );
+
+ MAP::iterator i = _ghostCache.find( rid );
+ if ( i == _ghostCache.end() ) {
+ OCCASIONALLY log() << "couldn't percolate slave " << rid << " no entry" << rsLog;
+ return;
+ }
+
+ slave = i->second.get();
+ if (!slave->init) {
+ OCCASIONALLY log() << "couldn't percolate slave " << rid << " not init" << rsLog;
+ return;
+ }
+ }
+
+ assert(slave->slave);
+
+ const Member *target = rs->_currentSyncTarget;
+ if (!target || rs->box.getState().primary()
+ // we are currently syncing from someone who's syncing from us
+ // the target might end up with a new Member, but s.slave never
+ // changes so we'll compare the names
+ || target == slave->slave || target->fullName() == slave->slave->fullName()) {
+ LOG(1) << "replica set ghost target no good" << endl;
+ return;
+ }
+
+ try {
+ if (!slave->reader.haveCursor()) {
+ if (!slave->reader.connect(id, slave->slave->id(), target->fullName())) {
+ // error message logged in OplogReader::connect
+ return;
+ }
+ slave->reader.ghostQueryGTE(rsoplog, last);
+ }
+
+ LOG(1) << "replSet last: " << slave->last.toString() << " to " << last.toString() << rsLog;
+ if (slave->last > last) {
+ return;
+ }
+
+ while (slave->last <= last) {
+ if (!slave->reader.more()) {
+ // we'll be back
+ return;
+ }
+
+ BSONObj o = slave->reader.nextSafe();
+ slave->last = o["ts"]._opTime();
+ }
+ LOG(2) << "now last is " << slave->last.toString() << rsLog;
+ }
+ catch (DBException& e) {
+ // we'll be back
+ LOG(2) << "replSet ghost sync error: " << e.what() << " for "
+ << slave->slave->fullName() << rsLog;
+ slave->reader.resetConnection();
+ }
+ }
+}
diff --git a/src/mongo/db/repl/test.html b/src/mongo/db/repl/test.html
new file mode 100644
index 00000000000..295ad2ef0e0
--- /dev/null
+++ b/src/mongo/db/repl/test.html
@@ -0,0 +1,11 @@
+<HTML>
+<BODY>
+<!-- see also jstests/rs/ -->
+<iframe src="http://127.0.0.1:28000/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+<iframe src="http://127.0.0.1:28001/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+</BODY>
+</HTML>
diff --git a/src/mongo/db/repl/testing.js b/src/mongo/db/repl/testing.js
new file mode 100644
index 00000000000..d741cf3a644
--- /dev/null
+++ b/src/mongo/db/repl/testing.js
@@ -0,0 +1,42 @@
+// helpers for testing repl sets
+// run
+// mongo --shell <host:port> testing.js
+
+cfg = {
+ _id: 'asdf',
+ members: [
+ { _id : 0, host : "dm_hp" },
+ { _id : 2, host : "dm_hp:27002" }
+ ]
+};
+c2 = {
+ _id: 'asdf',
+ members: [
+ { _id: 0, host: "dmthink" },
+ { _id: 2, host: "dmthink:27002" }
+ ]
+};
+
+db = db.getSisterDB("admin");
+local = db.getSisterDB("local");
+
+print("\n\ndb = admin db on localhost:27017");
+print("b = admin on localhost:27002");
+print("rc(x) = db.runCommand(x)");
+print("cfg = samp replset config");
+print("i() = replSetInitiate(cfg)");
+print("ism() = rc('ismaster')");
+print("\n\n");
+
+function rc(c) { return db.runCommand(c); }
+function i() { return rc({ replSetInitiate: cfg }); }
+function ism() { return rc("isMaster"); }
+
+b = 0;
+try {
+ b = new Mongo("localhost:27002").getDB("admin");
+}
+catch (e) {
+ print("\nCouldn't connect to b mongod instance\n");
+}
+