diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/connections.h | 128 | ||||
-rw-r--r-- | src/mongo/db/repl/consensus.cpp | 449 | ||||
-rw-r--r-- | src/mongo/db/repl/health.cpp | 449 | ||||
-rw-r--r-- | src/mongo/db/repl/health.h | 50 | ||||
-rw-r--r-- | src/mongo/db/repl/heartbeat.cpp | 382 | ||||
-rw-r--r-- | src/mongo/db/repl/manager.cpp | 274 | ||||
-rw-r--r-- | src/mongo/db/repl/multicmd.h | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 404 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.cpp | 778 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.h | 667 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_config.cpp | 662 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_config.h | 251 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_exception.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 271 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initiate.cpp | 269 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_member.h | 131 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_optime.h | 58 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 667 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 701 | ||||
-rw-r--r-- | src/mongo/db/repl/test.html | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/testing.js | 42 |
21 files changed, 6736 insertions, 0 deletions
diff --git a/src/mongo/db/repl/connections.h b/src/mongo/db/repl/connections.h new file mode 100644 index 00000000000..3e08f80b047 --- /dev/null +++ b/src/mongo/db/repl/connections.h @@ -0,0 +1,128 @@ +// @file + +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <map> +#include "../../client/dbclient.h" +#include "../security_common.h" + +namespace mongo { + + /** here we keep a single connection (with reconnect) for a set of hosts, + one each, and allow one user at a time per host. if in use already for that + host, we block. so this is an easy way to keep a 1-deep pool of connections + that many threads can share. + + thread-safe. + + Example: + { + ScopedConn c("foo.acme.com:9999"); + c->runCommand(...); + } + + throws exception on connect error (but fine to try again later with a new + scopedconn object for same host). + */ + class ScopedConn { + public: + /** throws assertions if connect failure etc. */ + ScopedConn(string hostport); + ~ScopedConn() { + // conLock releases... + } + void reconnect() { + conn()->port().shutdown(); + connect(); + } + + /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic. + So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes + ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed. + */ + bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) { + return conn()->runCommand(dbname, cmd, info, options); + } + unsigned long long count(const string &ns) { + return conn()->count(ns); + } + BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { + return conn()->findOne(ns, q, fieldsToReturn, queryOptions); + } + + private: + auto_ptr<scoped_lock> connLock; + static mongo::mutex mapMutex; + struct X { + mongo::mutex z; + DBClientConnection cc; + bool connected; + X() : z("X"), cc(/*reconnect*/ true, 0, /*timeout*/ 10.0), connected(false) { + cc._logLevel = 2; + } + } *x; + typedef map<string,ScopedConn::X*> M; + static M& _map; + DBClientConnection* conn() { return &x->cc; } + const string _hostport; + + // we should already be locked... + bool connect() { + string err; + if (!x->cc.connect(_hostport, err)) { + log() << "couldn't connect to " << _hostport << ": " << err << rsLog; + return false; + } + x->connected = true; + + // if we cannot authenticate against a member, then either its key file + // or our key file has to change. if our key file has to change, we'll + // be rebooting. if their file has to change, they'll be rebooted so the + // connection created above will go dead, reconnect, and reauth. + if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) { + log() << "could not authenticate against " << _hostport << ", " << err << rsLog; + return false; + } + + return true; + } + }; + + inline ScopedConn::ScopedConn(string hostport) : _hostport(hostport) { + bool first = false; + { + scoped_lock lk(mapMutex); + x = _map[_hostport]; + if( x == 0 ) { + x = _map[_hostport] = new X(); + first = true; + connLock.reset( new scoped_lock(x->z) ); + } + } + + // Keep trying to connect if we're not yet connected + if( !first && x->connected ) { + connLock.reset( new scoped_lock(x->z) ); + return; + } + + connect(); + } + +} diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp new file mode 100644 index 00000000000..3995373f5ef --- /dev/null +++ b/src/mongo/db/repl/consensus.cpp @@ -0,0 +1,449 @@ +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../commands.h" +#include "rs.h" +#include "multicmd.h" + +namespace mongo { + + class CmdReplSetFresh : public ReplSetCommand { + public: + CmdReplSetFresh() : ReplSetCommand("replSetFresh") { } + private: + + bool shouldVeto(const BSONObj& cmdObj, string& errmsg) { + unsigned id = cmdObj["id"].Int(); + const Member* primary = theReplSet->box.getPrimary(); + const Member* hopeful = theReplSet->findById(id); + const Member *highestPriority = theReplSet->getMostElectable(); + + if( !hopeful ) { + errmsg = str::stream() << "replSet couldn't find member with id " << id; + return true; + } + else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) { + // hbinfo is not updated, so we have to check the primary's last optime separately + errmsg = str::stream() << "I am already primary, " << hopeful->fullName() << + " can try again once I've stepped down"; + return true; + } + else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) { + // other members might be aware of more up-to-date nodes + errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " << + primary->fullName() << " is already primary and more up-to-date"; + return true; + } + else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) { + errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName(); + return true; + } + + // don't veto older versions + if (cmdObj["id"].eoo()) { + // they won't be looking for the veto field + return false; + } + + if ( !theReplSet->isElectable(id) || + (highestPriority && highestPriority->config().priority > hopeful->config().priority)) { + return true; + } + + return false; + } + + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + + if( cmdObj["set"].String() != theReplSet->name() ) { + errmsg = "wrong repl set name"; + return false; + } + string who = cmdObj["who"].String(); + int cfgver = cmdObj["cfgver"].Int(); + OpTime opTime(cmdObj["opTime"].Date()); + + bool weAreFresher = false; + if( theReplSet->config().version > cfgver ) { + log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog; + result.append("info", "config version stale"); + weAreFresher = true; + } + // check not only our own optime, but any other member we can reach + else if( opTime < theReplSet->lastOpTimeWritten || + opTime < theReplSet->lastOtherOpTime()) { + weAreFresher = true; + } + result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); + result.append("fresher", weAreFresher); + result.append("veto", shouldVeto(cmdObj, errmsg)); + + return true; + } + } cmdReplSetFresh; + + class CmdReplSetElect : public ReplSetCommand { + public: + CmdReplSetElect() : ReplSetCommand("replSetElect") { } + private: + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + theReplSet->elect.electCmdReceived(cmdObj, &result); + return true; + } + } cmdReplSetElect; + + int Consensus::totalVotes() const { + static int complain = 0; + int vTot = rs._self->config().votes; + for( Member *m = rs.head(); m; m=m->next() ) + vTot += m->config().votes; + if( vTot % 2 == 0 && vTot && complain++ == 0 ) + log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog; + return vTot; + } + + bool Consensus::aMajoritySeemsToBeUp() const { + int vUp = rs._self->config().votes; + for( Member *m = rs.head(); m; m=m->next() ) + vUp += m->hbinfo().up() ? m->config().votes : 0; + return vUp * 2 > totalVotes(); + } + + bool Consensus::shouldRelinquish() const { + int vUp = rs._self->config().votes; + const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries; + for( Member *m = rs.head(); m; m=m->next() ) { + long long dt = m->hbinfo().timeDown(); + if( dt < T ) + vUp += m->config().votes; + } + + // the manager will handle calling stepdown if another node should be + // primary due to priority + + return !( vUp * 2 > totalVotes() ); + } + + static const int VETO = -10000; + + const time_t LeaseTime = 30; + + SimpleMutex Consensus::lyMutex("ly"); + + unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */ + SimpleMutex::scoped_lock lk(lyMutex); + LastYea &L = this->ly.ref(lk); + time_t now = time(0); + if( L.when + LeaseTime >= now && L.who != memberId ) { + LOG(1) << "replSet not voting yea for " << memberId << + " voted for " << L.who << ' ' << now-L.when << " secs ago" << rsLog; + throw VoteException(); + } + L.when = now; + L.who = memberId; + return rs._self->config().votes; + } + + /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in + place instead of leaving it for a long time. + */ + void Consensus::electionFailed(unsigned meid) { + SimpleMutex::scoped_lock lk(lyMutex); + LastYea &L = ly.ref(lk); + DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test + if( L.who == meid ) + L.when = 0; + } + + /* todo: threading **************** !!!!!!!!!!!!!!!! */ + void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { + BSONObjBuilder& b = *_b; + DEV log() << "replSet received elect msg " << cmd.toString() << rsLog; + else LOG(2) << "replSet received elect msg " << cmd.toString() << rsLog; + string set = cmd["set"].String(); + unsigned whoid = cmd["whoid"].Int(); + int cfgver = cmd["cfgver"].Int(); + OID round = cmd["round"].OID(); + int myver = rs.config().version; + + const Member* primary = rs.box.getPrimary(); + const Member* hopeful = rs.findById(whoid); + const Member* highestPriority = rs.getMostElectable(); + + int vote = 0; + if( set != rs.name() ) { + log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; + } + else if( myver < cfgver ) { + // we are stale. don't vote + } + else if( myver > cfgver ) { + // they are stale! + log() << "replSet electCmdReceived info got stale version # during election" << rsLog; + vote = -10000; + } + else if( !hopeful ) { + log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog; + vote = -10000; + } + else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) { + // hbinfo is not updated, so we have to check the primary's last optime separately + log() << "I am already primary, " << hopeful->fullName() + << " can try again once I've stepped down" << rsLog; + vote = -10000; + } + else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) { + // other members might be aware of more up-to-date nodes + log() << hopeful->fullName() << " is trying to elect itself but " << + primary->fullName() << " is already primary and more up-to-date" << rsLog; + vote = -10000; + } + else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) { + log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName(); + vote = -10000; + } + else { + try { + vote = yea(whoid); + dassert( hopeful->id() == whoid ); + rs.relinquish(); + log() << "replSet info voting yea for " << hopeful->fullName() << " (" << whoid << ')' << rsLog; + } + catch(VoteException&) { + log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog; + } + } + + b.append("vote", vote); + b.append("round", round); + } + + void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) { + configVersion = config().version; + for( Member *m = head(); m; m=m->next() ) + if( m->hbinfo().maybeUp() ) + L.push_back( Target(m->fullName()) ); + } + + /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need + to check later that the config didn't change. */ + void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) { + if( lockedByMe() ) { + _getTargets(L, configVersion); + return; + } + lock lk(this); + _getTargets(L, configVersion); + } + + /* Do we have the newest data of them all? + @param allUp - set to true if all members are up. Only set if true returned. + @return true if we are freshest. Note we may tie. + */ + bool Consensus::weAreFreshest(bool& allUp, int& nTies) { + const OpTime ord = theReplSet->lastOpTimeWritten; + nTies = 0; + assert( !ord.isNull() ); + BSONObj cmd = BSON( + "replSetFresh" << 1 << + "set" << rs.name() << + "opTime" << Date_t(ord.asDate()) << + "who" << rs._self->fullName() << + "cfgver" << rs._cfg->version << + "id" << rs._self->id()); + list<Target> L; + int ver; + /* the following queries arbiters, even though they are never fresh. wonder if that makes sense. + it doesn't, but it could, if they "know" what freshness it one day. so consider removing + arbiters from getTargets() here. although getTargets is used elsewhere for elections; there + arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make + not fetching them herein happen. + */ + rs.getTargets(L, ver); + multiCommand(cmd, L); + int nok = 0; + allUp = true; + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + if( i->ok ) { + nok++; + if( i->result["fresher"].trueValue() ) { + log() << "not electing self, we are not freshest" << rsLog; + return false; + } + OpTime remoteOrd( i->result["opTime"].Date() ); + if( remoteOrd == ord ) + nTies++; + assert( remoteOrd <= ord ); + + if( i->result["veto"].trueValue() ) { + BSONElement msg = i->result["errmsg"]; + if (!msg.eoo()) { + log() << "not electing self, " << i->toHost << " would veto with '" << + msg.String() << "'" << rsLog; + } + else { + log() << "not electing self, " << i->toHost << " would veto" << rsLog; + } + return false; + } + } + else { + DEV log() << "replSet freshest returns " << i->result.toString() << rsLog; + allUp = false; + } + } + LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; + assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... + return true; + } + + extern time_t started; + + void Consensus::multiCommand(BSONObj cmd, list<Target>& L) { + assert( !rs.lockedByMe() ); + mongo::multiCommand(cmd, L); + } + + void Consensus::_electSelf() { + if( time(0) < steppedDown ) + return; + + { + const OpTime ord = theReplSet->lastOpTimeWritten; + if( ord == 0 ) { + log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog; + return; + } + } + + bool allUp; + int nTies; + if( !weAreFreshest(allUp, nTies) ) { + return; + } + + rs.sethbmsg("",9); + + if( !allUp && time(0) - started < 60 * 5 ) { + /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data + if we don't have to -- we'd rather be offline and wait a little longer instead + todo: make this configurable. + */ + rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes"); + return; + } + + Member& me = *rs._self; + + if( nTies ) { + /* tie? we then randomly sleep to try to not collide on our voting. */ + /* todo: smarter. */ + if( me.id() == 0 || sleptLast ) { + // would be fine for one node not to sleep + // todo: biggest / highest priority nodes should be the ones that get to not sleep + } + else { + assert( !rs.lockedByMe() ); // bad to go to sleep locked + unsigned ms = ((unsigned) rand()) % 1000 + 50; + DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog; + sleptLast = true; + sleepmillis(ms); + throw RetryAfterSleepException(); + } + } + sleptLast = false; + + time_t start = time(0); + unsigned meid = me.id(); + int tally = yea( meid ); + bool success = false; + try { + log() << "replSet info electSelf " << meid << rsLog; + + BSONObj electCmd = BSON( + "replSetElect" << 1 << + "set" << rs.name() << + "who" << me.fullName() << + "whoid" << me.hbinfo().id() << + "cfgver" << rs._cfg->version << + "round" << OID::gen() /* this is just for diagnostics */ + ); + + int configVersion; + list<Target> L; + rs.getTargets(L, configVersion); + multiCommand(electCmd, L); + + { + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + DEV log() << "replSet elect res: " << i->result.toString() << rsLog; + if( i->ok ) { + int v = i->result["vote"].Int(); + tally += v; + } + } + if( tally*2 <= totalVotes() ) { + log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog; + } + else if( time(0) - start > 30 ) { + // defensive; should never happen as we have timeouts on connection and operation for our conn + log() << "replSet too much time passed during our election, ignoring result" << rsLog; + } + else if( configVersion != rs.config().version ) { + log() << "replSet config version changed during our election, ignoring result" << rsLog; + } + else { + /* succeeded. */ + log(1) << "replSet election succeeded, assuming primary role" << rsLog; + success = true; + rs.assumePrimary(); + } + } + } + catch( std::exception& ) { + if( !success ) electionFailed(meid); + throw; + } + if( !success ) electionFailed(meid); + } + + void Consensus::electSelf() { + assert( !rs.lockedByMe() ); + assert( !rs.myConfig().arbiterOnly ); + assert( rs.myConfig().slaveDelay == 0 ); + try { + _electSelf(); + } + catch(RetryAfterSleepException&) { + throw; + } + catch(VoteException& ) { + log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog; + } + catch(DBException& e) { + log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog; + } + catch(...) { + log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; + } + } + +} diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp new file mode 100644 index 00000000000..0b7ed87eac3 --- /dev/null +++ b/src/mongo/db/repl/health.cpp @@ -0,0 +1,449 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "health.h" +#include "../../util/background.h" +#include "../../client/dbclient.h" +#include "../../client/connpool.h" +#include "../commands.h" +#include "../../util/concurrency/value.h" +#include "../../util/concurrency/task.h" +#include "../../util/mongoutils/html.h" +#include "../../util/goodies.h" +#include "../../util/ramlog.h" +#include "../helpers/dblogger.h" +#include "connections.h" +#include "../../util/unittest.h" +#include "../dbhelpers.h" + +namespace mongo { + /* decls for connections.h */ + ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); + mutex ScopedConn::mapMutex("ScopedConn::mapMutex"); +} + +namespace mongo { + + using namespace mongoutils::html; + using namespace bson; + + static RamLog * _rsLog = new RamLog( "rs" ); + Tee *rsLog = _rsLog; + extern bool replSetBlind; // for testing + + string ago(time_t t) { + if( t == 0 ) return ""; + + time_t x = time(0) - t; + stringstream s; + if( x < 180 ) { + s << x << " sec"; + if( x != 1 ) s << 's'; + } + else if( x < 3600 ) { + s.precision(2); + s << x / 60.0 << " mins"; + } + else { + s.precision(2); + s << x / 3600.0 << " hrs"; + } + return s.str(); + } + + void Member::summarizeMember(stringstream& s) const { + s << tr(); + { + stringstream u; + u << "http://" << h().host() << ':' << (h().port() + 1000) << "/_replSet"; + s << td( a(u.str(), "", fullName()) ); + } + s << td( id() ); + double h = hbinfo().health; + bool ok = h > 0; + s << td(red(str::stream() << h,h == 0)); + s << td(ago(hbinfo().upSince)); + bool never = false; + { + string h; + time_t hb = hbinfo().lastHeartbeat; + if( hb == 0 ) { + h = "never"; + never = true; + } + else h = ago(hb) + " ago"; + s << td(h); + } + s << td(config().votes); + s << td(config().priority); + { + string stateText = state().toString(); + if( _config.hidden ) + stateText += " (hidden)"; + if( ok || stateText.empty() ) + s << td(stateText); // text blank if we've never connected + else + s << td( grey(str::stream() << "(was " << state().toString() << ')', true) ); + } + s << td( grey(hbinfo().lastHeartbeatMsg,!ok) ); + stringstream q; + q << "/_replSetOplog?_id=" << id(); + s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) ); + if( hbinfo().skew > INT_MIN ) { + s << td( grey(str::stream() << hbinfo().skew,!ok) ); + } + else + s << td(""); + s << _tr(); + } + + string ReplSetImpl::stateAsHtml(MemberState s) { + if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP"); + if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY"); + if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY"); + if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING"); + if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "FATAL"); + if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2"); + if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER"); + if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN"); + if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK"); + return ""; + } + + extern time_t started; + + // oplogdiags in web ui + static void say(stringstream&ss, const bo& op) { + ss << "<tr>"; + + set<string> skip; + be e = op["ts"]; + if( e.type() == Date || e.type() == Timestamp ) { + OpTime ot = e._opTime(); + ss << td( time_t_to_String_short( ot.getSecs() ) ); + ss << td( ot.toString() ); + skip.insert("ts"); + } + else ss << td("?") << td("?"); + + e = op["h"]; + if( e.type() == NumberLong ) { + ss << "<td>" << hex << e.Long() << "</td>\n"; + skip.insert("h"); + } + else + ss << td("?"); + + ss << td(op["op"].valuestrsafe()); + ss << td(op["ns"].valuestrsafe()); + skip.insert("op"); + skip.insert("ns"); + + ss << "<td>"; + for( bo::iterator i(op); i.more(); ) { + be e = i.next(); + if( skip.count(e.fieldName()) ) continue; + ss << e.toString() << ' '; + } + ss << "</td></tr>\n"; + } + + void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { + const Member *m = findById(server_id); + if( m == 0 ) { + ss << "Error : can't find a member with id: " << server_id << '\n'; + return; + } + + ss << p("Server : " + m->fullName() + "<br>ns : " + rsoplog ); + + //const bo fields = BSON( "o" << false << "o2" << false ); + const bo fields; + + /** todo fix we might want an so timeout here */ + DBClientConnection conn(false, 0, /*timeout*/ 20); + { + string errmsg; + if( !conn.connect(m->fullName(), errmsg) ) { + ss << "couldn't connect to " << m->fullName() << ' ' << errmsg; + return; + } + } + + auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",1), 20, 0, &fields); + if( c.get() == 0 ) { + ss << "couldn't query " << rsoplog; + return; + } + static const char *h[] = {"ts","optime", "h","op","ns","rest",0}; + + ss << "<style type=\"text/css\" media=\"screen\">" + "table { font-size:75% }\n" + // "th { background-color:#bbb; color:#000 }\n" + // "td,th { padding:.25em }\n" + "</style>\n"; + + ss << table(h, true); + //ss << "<pre>\n"; + int n = 0; + OpTime otFirst; + OpTime otLast; + OpTime otEnd; + while( c->more() ) { + bo o = c->next(); + otLast = o["ts"]._opTime(); + if( otFirst.isNull() ) + otFirst = otLast; + say(ss, o); + n++; + } + if( n == 0 ) { + ss << rsoplog << " is empty\n"; + } + else { + auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields); + if( c.get() == 0 ) { + ss << "couldn't query [2] " << rsoplog; + return; + } + string x; + bo o = c->next(); + otEnd = o["ts"]._opTime(); + while( 1 ) { + stringstream z; + if( o["ts"]._opTime() == otLast ) + break; + say(z, o); + x = z.str() + x; + if( !c->more() ) + break; + o = c->next(); + } + if( !x.empty() ) { + ss << "<tr><td>...</td><td>...</td><td>...</td><td>...</td><td>...</td></tr>\n" << x; + //ss << "\n...\n\n" << x; + } + } + ss << _table(); + ss << p(time_t_to_String_short(time(0)) + " current time"); + + if( !otEnd.isNull() ) { + ss << "<p>Log length in time: "; + unsigned d = otEnd.getSecs() - otFirst.getSecs(); + double h = d / 3600.0; + ss.precision(3); + if( h < 72 ) + ss << h << " hours"; + else + ss << h / 24.0 << " days"; + ss << "</p>\n"; + } + } + + void ReplSetImpl::_summarizeAsHtml(stringstream& s) const { + s << table(0, false); + s << tr("Set name:", _name); + s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" ); + s << _table(); + + const char *h[] = {"Member", + "<a title=\"member id in the replset config\">id</a>", + "Up", + "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>", + "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>", + "Votes", "Priority", "State", "Messages", + "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>", + "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>", + 0 + }; + s << table(h); + + /* this is to sort the member rows by their ordinal _id, so they show up in the same + order on all the different web ui's; that is less confusing for the operator. */ + map<int,string> mp; + + string myMinValid; + try { + readlocktry lk("local.replset.minvalid", 300); + if( lk.got() ) { + BSONObj mv; + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + myMinValid = "minvalid:" + mv["ts"]._opTime().toString(); + } + } + else myMinValid = "."; + } + catch(...) { + myMinValid = "exception fetching minvalid"; + } + + const Member *_self = this->_self; + assert(_self); + { + stringstream s; + /* self row */ + s << tr() << td(_self->fullName() + " (me)") << + td(_self->id()) << + td("1") << //up + td(ago(started)) << + td("") << // last heartbeat + td(ToString(_self->config().votes)) << + td(ToString(_self->config().priority)) << + td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") ); + s << td( _hbmsg ); + stringstream q; + q << "/_replSetOplog?_id=" << _self->id(); + s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) ); + s << td(""); // skew + s << _tr(); + mp[_self->hbinfo().id()] = s.str(); + } + Member *m = head(); + while( m ) { + stringstream s; + m->summarizeMember(s); + mp[m->hbinfo().id()] = s.str(); + m = m->next(); + } + + for( map<int,string>::const_iterator i = mp.begin(); i != mp.end(); i++ ) + s << i->second; + s << _table(); + } + + + void fillRsLog(stringstream& s) { + _rsLog->toHTML( s ); + } + + const Member* ReplSetImpl::findById(unsigned id) const { + if( _self && id == _self->id() ) return _self; + + for( Member *m = head(); m; m = m->next() ) + if( m->id() == id ) + return m; + return 0; + } + + const OpTime ReplSetImpl::lastOtherOpTime() const { + OpTime closest(0,0); + + for( Member *m = _members.head(); m; m=m->next() ) { + if (!m->hbinfo().up()) { + continue; + } + + if (m->hbinfo().opTime > closest) { + closest = m->hbinfo().opTime; + } + } + + return closest; + } + + void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const { + vector<BSONObj> v; + + const Member *_self = this->_self; + assert( _self ); + + MemberState myState = box.getState(); + + // add self + { + BSONObjBuilder bb; + bb.append("_id", (int) _self->id()); + bb.append("name", _self->fullName()); + bb.append("health", 1.0); + bb.append("state", (int)myState.s); + bb.append("stateStr", myState.toString()); + bb.append("uptime", (unsigned)(time(0) - cmdLine.started)); + if (!_self->config().arbiterOnly) { + bb.appendTimestamp("optime", lastOpTimeWritten.asDate()); + bb.appendDate("optimeDate", lastOpTimeWritten.getSecs() * 1000LL); + } + + int maintenance = _maintenanceMode; + if (maintenance) { + bb.append("maintenanceMode", maintenance); + } + + if (theReplSet) { + string s = theReplSet->hbmsg(); + if( !s.empty() ) + bb.append("errmsg", s); + } + bb.append("self", true); + v.push_back(bb.obj()); + } + + Member *m =_members.head(); + while( m ) { + BSONObjBuilder bb; + bb.append("_id", (int) m->id()); + bb.append("name", m->fullName()); + double h = m->hbinfo().health; + bb.append("health", h); + bb.append("state", (int) m->state().s); + if( h == 0 ) { + // if we can't connect the state info is from the past and could be confusing to show + bb.append("stateStr", "(not reachable/healthy)"); + } + else { + bb.append("stateStr", m->state().toString()); + } + bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0)); + if (!m->config().arbiterOnly) { + bb.appendTimestamp("optime", m->hbinfo().opTime.asDate()); + bb.appendDate("optimeDate", m->hbinfo().opTime.getSecs() * 1000LL); + } + bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat); + bb.append("pingMs", m->hbinfo().ping); + string s = m->lhb(); + if( !s.empty() ) + bb.append("errmsg", s); + + if (m->hbinfo().authIssue) { + bb.append("authenticated", false); + } + + v.push_back(bb.obj()); + m = m->next(); + } + sort(v.begin(), v.end()); + b.append("set", name()); + b.appendTimeT("date", time(0)); + b.append("myState", myState.s); + const Member *syncTarget = _currentSyncTarget; + if (syncTarget && myState != MemberState::RS_PRIMARY) { + b.append("syncingTo", syncTarget->fullName()); + } + b.append("members", v); + if( replSetBlind ) + b.append("blind",true); // to avoid confusion if set...normally never set except for testing. + } + + static struct Test : public UnitTest { + void run() { + HealthOptions a,b; + assert( a == b ); + assert( a.isDefault() ); + } + } test; + +} diff --git a/src/mongo/db/repl/health.h b/src/mongo/db/repl/health.h new file mode 100644 index 00000000000..55cca93a27e --- /dev/null +++ b/src/mongo/db/repl/health.h @@ -0,0 +1,50 @@ +// replset.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +namespace mongo { + + /* throws */ + bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false); + + struct HealthOptions { + HealthOptions() : + heartbeatSleepMillis(2000), + heartbeatTimeoutMillis( 10000 ), + heartbeatConnRetries(2) + { } + + bool isDefault() const { return *this == HealthOptions(); } + + // see http://www.mongodb.org/display/DOCS/Replica+Set+Internals + unsigned heartbeatSleepMillis; + unsigned heartbeatTimeoutMillis; + unsigned heartbeatConnRetries ; + + void check() { + uassert(13112, "bad replset heartbeat option", heartbeatSleepMillis >= 10); + uassert(13113, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10); + } + + bool operator==(const HealthOptions& r) const { + return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==r.heartbeatConnRetries; + } + }; + +} diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp new file mode 100644 index 00000000000..331812af85a --- /dev/null +++ b/src/mongo/db/repl/heartbeat.cpp @@ -0,0 +1,382 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "health.h" +#include "../../util/background.h" +#include "../../client/dbclient.h" +#include "../commands.h" +#include "../../util/concurrency/value.h" +#include "../../util/concurrency/task.h" +#include "../../util/concurrency/msg.h" +#include "../../util/mongoutils/html.h" +#include "../../util/goodies.h" +#include "../../util/ramlog.h" +#include "../helpers/dblogger.h" +#include "connections.h" +#include "../../util/unittest.h" +#include "../instance.h" +#include "../repl.h" + +namespace mongo { + + using namespace bson; + + extern bool replSetBlind; + extern ReplSettings replSettings; + + unsigned int HeartbeatInfo::numPings; + + long long HeartbeatInfo::timeDown() const { + if( up() ) return 0; + if( downSince == 0 ) + return 0; // still waiting on first heartbeat + return jsTime() - downSince; + } + + /* { replSetHeartbeat : <setname> } */ + class CmdReplSetHeartbeat : public ReplSetCommand { + public: + CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( replSetBlind ) { + if (theReplSet) { + errmsg = str::stream() << theReplSet->selfFullName() << " is blind"; + } + return false; + } + + /* we don't call ReplSetCommand::check() here because heartbeat + checks many things that are pre-initialization. */ + if( !replSet ) { + errmsg = "not running with --replSet"; + return false; + } + + if (!checkAuth(errmsg, result)) { + return false; + } + + /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ + { + AbstractMessagingPort *mp = cc().port(); + if( mp ) + mp->tag |= 1; + } + + if( cmdObj["pv"].Int() != 1 ) { + errmsg = "incompatible replset protocol version"; + return false; + } + { + string s = string(cmdObj.getStringField("replSetHeartbeat")); + if( cmdLine.ourSetName() != s ) { + errmsg = "repl set names do not match"; + log() << "replSet set names do not match, our cmdline: " << cmdLine._replSet << rsLog; + log() << "replSet s: " << s << rsLog; + result.append("mismatch", true); + return false; + } + } + + result.append("rs", true); + if( cmdObj["checkEmpty"].trueValue() ) { + result.append("hasData", replHasDatabases()); + } + if( theReplSet == 0 ) { + string from( cmdObj.getStringField("from") ); + if( !from.empty() ) { + scoped_lock lck( replSettings.discoveredSeeds_mx ); + replSettings.discoveredSeeds.insert(from); + } + result.append("hbmsg", "still initializing"); + return true; + } + + if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { + errmsg = "repl set names do not match (2)"; + result.append("mismatch", true); + return false; + } + result.append("set", theReplSet->name()); + result.append("state", theReplSet->state().s); + result.append("e", theReplSet->iAmElectable()); + result.append("hbmsg", theReplSet->hbmsg()); + result.append("time", (long long) time(0)); + result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); + int v = theReplSet->config().version; + result.append("v", v); + if( v > cmdObj["v"].Int() ) + result << "config" << theReplSet->config().asBson(); + + return true; + } + } cmdReplSetHeartbeat; + + bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, + int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { + if( replSetBlind ) { + return false; + } + + BSONObj cmd = BSON( "replSetHeartbeat" << setName << + "v" << myCfgVersion << + "pv" << 1 << + "checkEmpty" << checkEmpty << + "from" << from ); + + // generally not a great idea to do outbound waiting calls in a + // write lock. heartbeats can be slow (multisecond to respond), so + // generally we don't want to be locked, at least not without + // thinking acarefully about it first. + uassert(15900, "can't heartbeat: too much lock", + !d.dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() ); + + ScopedConn conn(memberFullName); + return conn.runCommand("admin", cmd, result, 0); + } + + /** + * Poll every other set member to check its status. + * + * A detail about local machines and authentication: suppose we have 2 + * members, A and B, on the same machine using different keyFiles. A is + * primary. If we're just starting the set, there are no admin users, so A + * and B can access each other because it's local access. + * + * Then we add a user to A. B cannot sync this user from A, because as soon + * as we add a an admin user, A requires auth. However, A can still + * heartbeat B, because B *doesn't* have an admin user. So A can reach B + * but B cannot reach A. + * + * Once B is restarted with the correct keyFile, everything should work as + * expected. + */ + class ReplSetHealthPollTask : public task::Task { + private: + HostAndPort h; + HeartbeatInfo m; + int tries; + const int threshold; + public: + ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) + : h(hh), m(mm), tries(0), threshold(15) { } + + string name() const { return "rsHealthPoll"; } + void doWork() { + if ( !theReplSet ) { + LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog; + return; + } + + HeartbeatInfo mem = m; + HeartbeatInfo old = mem; + try { + BSONObj info; + int theirConfigVersion = -10000; + + bool ok = _requestHeartbeat(mem, info, theirConfigVersion); + + // weight new ping with old pings + // on the first ping, just use the ping value + if (old.ping != 0) { + mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2)); + } + + if( ok ) { + up(info, mem); + } + else if (!info["errmsg"].eoo() && + info["errmsg"].str() == "need to login") { + authIssue(mem); + } + else { + down(mem, info.getStringField("errmsg")); + } + } + catch(DBException& e) { + down(mem, e.what()); + } + catch(...) { + down(mem, "replSet unexpected exception in ReplSetHealthPollTask"); + } + m = mem; + + theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) ); + + static time_t last = 0; + time_t now = time(0); + bool changed = mem.changed(old); + if( changed ) { + if( old.hbstate != mem.hbstate ) + log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog; + } + if( changed || now-last>4 ) { + last = now; + theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + } + } + + private: + bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) { + if (tries++ % threshold == (threshold - 1)) { + ScopedConn conn(h.toString()); + conn.reconnect(); + } + + Timer timer; + time_t before = curTimeMicros64() / 1000000; + + bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), + h.toString(), info, theReplSet->config().version, theirConfigVersion); + + mem.ping = (unsigned int)timer.millis(); + + // we set this on any response - we don't get this far if + // couldn't connect because exception is thrown + time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + + if ( info["time"].isNumber() ) { + long long t = info["time"].numberLong(); + if( t > after ) + mem.skew = (int) (t - after); + else if( t < before ) + mem.skew = (int) (t - before); // negative + } + else { + // it won't be there if remote hasn't initialized yet + if( info.hasElement("time") ) + warning() << "heatbeat.time isn't a number: " << info << endl; + mem.skew = INT_MIN; + } + + { + be state = info["state"]; + if( state.ok() ) + mem.hbstate = MemberState(state.Int()); + } + + return ok; + } + + void authIssue(HeartbeatInfo& mem) { + mem.authIssue = true; + mem.hbstate = MemberState::RS_UNKNOWN; + + // set health to 0 so that this doesn't count towards majority + mem.health = 0.0; + theReplSet->rmFromElectable(mem.id()); + } + + void down(HeartbeatInfo& mem, string msg) { + mem.authIssue = false; + mem.health = 0.0; + mem.ping = 0; + if( mem.upSince || mem.downSince == 0 ) { + mem.upSince = 0; + mem.downSince = jsTime(); + mem.hbstate = MemberState::RS_DOWN; + log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog; + } + mem.lastHeartbeatMsg = msg; + theReplSet->rmFromElectable(mem.id()); + } + + void up(const BSONObj& info, HeartbeatInfo& mem) { + HeartbeatInfo::numPings++; + mem.authIssue = false; + + if( mem.upSince == 0 ) { + log() << "replSet member " << h.toString() << " is up" << rsLog; + mem.upSince = mem.lastHeartbeat; + } + mem.health = 1.0; + mem.lastHeartbeatMsg = info["hbmsg"].String(); + if( info.hasElement("opTime") ) + mem.opTime = info["opTime"].Date(); + + // see if this member is in the electable set + if( info["e"].eoo() ) { + // for backwards compatibility + const Member *member = theReplSet->findById(mem.id()); + if (member && member->config().potentiallyHot()) { + theReplSet->addToElectable(mem.id()); + } + else { + theReplSet->rmFromElectable(mem.id()); + } + } + // add this server to the electable set if it is within 10 + // seconds of the latest optime we know of + else if( info["e"].trueValue() && + mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { + unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); + if (lastOp > 0 && mem.opTime >= lastOp - 10) { + theReplSet->addToElectable(mem.id()); + } + } + else { + theReplSet->rmFromElectable(mem.id()); + } + + be cfg = info["config"]; + if( cfg.ok() ) { + // received a new config + boost::function<void()> f = + boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); + theReplSet->mgr->send(f); + } + } + }; + + void ReplSetImpl::endOldHealthTasks() { + unsigned sz = healthTasks.size(); + for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ ) + (*i)->halt(); + healthTasks.clear(); + if( sz ) + DEV log() << "replSet debug: cleared old tasks " << sz << endl; + } + + void ReplSetImpl::startHealthTaskFor(Member *m) { + ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo()); + healthTasks.insert(task); + task::repeat(task, 2000); + } + + void startSyncThread(); + + /** called during repl set startup. caller expects it to return fairly quickly. + note ReplSet object is only created once we get a config - so this won't run + until the initiation. + */ + void ReplSetImpl::startThreads() { + task::fork(mgr); + mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + + boost::thread t(startSyncThread); + + task::fork(ghost); + + // member heartbeats are started in ReplSetImpl::initFromConfig + } + +} + +/* todo: + stop bg job and delete on removefromset +*/ diff --git a/src/mongo/db/repl/manager.cpp b/src/mongo/db/repl/manager.cpp new file mode 100644 index 00000000000..91648a1b506 --- /dev/null +++ b/src/mongo/db/repl/manager.cpp @@ -0,0 +1,274 @@ +/* @file manager.cpp +*/ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "connections.h" +#include "../client.h" + +namespace mongo { + + enum { + NOPRIMARY = -2, + SELFPRIMARY = -1 + }; + + /* check members OTHER THAN US to see if they think they are primary */ + const Member * Manager::findOtherPrimary(bool& two) { + two = false; + Member *m = rs->head(); + Member *p = 0; + while( m ) { + DEV assert( m != rs->_self ); + if( m->state().primary() && m->hbinfo().up() ) { + if( p ) { + two = true; + return 0; + } + p = m; + } + m = m->next(); + } + if( p ) + noteARemoteIsPrimary(p); + return p; + } + + Manager::Manager(ReplSetImpl *_rs) : + task::Server("rsMgr"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) { + } + + Manager::~Manager() { + /* we don't destroy the replset object we sit in; however, the destructor could have thrown on init. + the log message below is just a reminder to come back one day and review this code more, and to + make it cleaner. + */ + log() << "info: ~Manager called" << rsLog; + rs->mgr = 0; + } + + void Manager::starting() { + Client::initThread("rsMgr"); + replLocalAuth(); + } + + void Manager::noteARemoteIsPrimary(const Member *m) { + if( rs->box.getPrimary() == m ) + return; + rs->_self->lhb() = ""; + if( rs->iAmArbiterOnly() ) { + rs->box.set(MemberState::RS_ARBITER, m); + } + else { + rs->box.noteRemoteIsPrimary(m); + } + } + + void Manager::checkElectableSet() { + unsigned otherOp = rs->lastOtherOpTime().getSecs(); + + // make sure the electable set is up-to-date + if (rs->elect.aMajoritySeemsToBeUp() && + rs->iAmPotentiallyHot() && + (otherOp == 0 || rs->lastOpTimeWritten.getSecs() >= otherOp - 10)) { + theReplSet->addToElectable(rs->selfId()); + } + else { + theReplSet->rmFromElectable(rs->selfId()); + } + + // check if we should ask the primary (possibly ourselves) to step down + const Member *highestPriority = theReplSet->getMostElectable(); + const Member *primary = rs->box.getPrimary(); + + if (primary && highestPriority && + highestPriority->config().priority > primary->config().priority) { + log() << "stepping down " << primary->fullName() << endl; + + if (primary->h().isSelf()) { + // replSetStepDown tries to acquire the same lock + // msgCheckNewState takes, so we can't call replSetStepDown on + // ourselves. + rs->relinquish(); + } + else { + BSONObj cmd = BSON( "replSetStepDown" << 1 ); + ScopedConn conn(primary->fullName()); + BSONObj result; + if (!conn.runCommand("admin", cmd, result, 0)) { + log() << "stepping down " << primary->fullName() + << " failed: " << result << endl; + } + } + } + } + + void Manager::checkAuth() { + int down = 0, authIssue = 0, total = 0; + + for( Member *m = rs->head(); m; m=m->next() ) { + total++; + + // all authIssue servers will also be not up + if (!m->hbinfo().up()) { + down++; + if (m->hbinfo().authIssue) { + authIssue++; + } + } + } + + // if all nodes are down or failed auth AND at least one failed + // auth, go into recovering. If all nodes are down, stay a + // secondary. + if (authIssue > 0 && down == total) { + log() << "replset error could not reach/authenticate against any members" << endl; + + if (rs->box.getPrimary() == rs->_self) { + log() << "auth problems, relinquishing primary" << rsLog; + rs->relinquish(); + } + + rs->blockSync(true); + } + else { + rs->blockSync(false); + } + } + + /** called as the health threads get new results */ + void Manager::msgCheckNewState() { + { + theReplSet->assertValid(); + rs->assertValid(); + + RSBase::lock lk(rs); + + if( busyWithElectSelf ) return; + + checkElectableSet(); + checkAuth(); + + const Member *p = rs->box.getPrimary(); + if( p && p != rs->_self ) { + if( !p->hbinfo().up() || + !p->hbinfo().hbstate.primary() ) { + p = 0; + rs->box.setOtherPrimary(0); + } + } + + const Member *p2; + { + bool two; + p2 = findOtherPrimary(two); + if( two ) { + /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ + log() << "replSet info two primaries (transiently)" << rsLog; + return; + } + } + + if( p2 ) { + /* someone else thinks they are primary. */ + if( p == p2 ) { + // we thought the same; all set. + return; + } + if( p == 0 ) { + noteARemoteIsPrimary(p2); + return; + } + // todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + if( p != rs->_self ) { + // switch primary from oldremotep->newremotep2 + noteARemoteIsPrimary(p2); + return; + } + /* we thought we were primary, yet now someone else thinks they are. */ + if( !rs->elect.aMajoritySeemsToBeUp() ) { + /* we can't see a majority. so the other node is probably the right choice. */ + noteARemoteIsPrimary(p2); + return; + } + /* ignore for now, keep thinking we are master. + this could just be timing (we poll every couple seconds) or could indicate + a problem? if it happens consistently for a duration of time we should + alert the sysadmin. + */ + return; + } + + /* didn't find anyone who wants to be primary */ + + if( p ) { + /* we are already primary */ + + if( p != rs->_self ) { + rs->sethbmsg("error p != rs->self in checkNewState"); + log() << "replSet " << p->fullName() << rsLog; + log() << "replSet " << rs->_self->fullName() << rsLog; + return; + } + + if( rs->elect.shouldRelinquish() ) { + log() << "can't see a majority of the set, relinquishing primary" << rsLog; + rs->relinquish(); + } + + return; + } + + if( !rs->iAmPotentiallyHot() ) { // if not we never try to be primary + OCCASIONALLY log() << "replSet I don't see a primary and I can't elect myself" << endl; + return; + } + + /* no one seems to be primary. shall we try to elect ourself? */ + if( !rs->elect.aMajoritySeemsToBeUp() ) { + static time_t last; + static int n; + int ll = 0; + if( ++n > 5 ) ll++; + if( last + 60 > time(0 ) ) ll++; + log(ll) << "replSet can't see a majority, will not try to elect self" << rsLog; + last = time(0); + return; + } + + if( !rs->iAmElectable() ) { + return; + } + + busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one. + } + try { + rs->elect.electSelf(); + } + catch(RetryAfterSleepException&) { + /* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */ + requeue(); + } + catch(...) { + log() << "replSet error unexpected assertion in rs manager" << rsLog; + } + busyWithElectSelf = false; + } + +} diff --git a/src/mongo/db/repl/multicmd.h b/src/mongo/db/repl/multicmd.h new file mode 100644 index 00000000000..2d70c551f64 --- /dev/null +++ b/src/mongo/db/repl/multicmd.h @@ -0,0 +1,75 @@ +// @file multicmd.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../../util/background.h" +#include "connections.h" + +namespace mongo { + + struct Target { + Target(string hostport) : toHost(hostport), ok(false) { } + //Target() : ok(false) { } + const string toHost; + bool ok; + BSONObj result; + }; + + /** send a command to several servers in parallel. waits for all to complete before + returning. + + in: Target::toHost + out: Target::result and Target::ok + */ + void multiCommand(BSONObj cmd, list<Target>& L); + + class _MultiCommandJob : public BackgroundJob { + public: + BSONObj& cmd; + Target& d; + _MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { } + + private: + string name() const { return "MultiCommandJob"; } + void run() { + try { + ScopedConn c(d.toHost); + d.ok = c.runCommand("admin", cmd, d.result); + } + catch(DBException&) { + DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog; + } + } + }; + + inline void multiCommand(BSONObj cmd, list<Target>& L) { + list< shared_ptr<BackgroundJob> > jobs; + + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + Target& d = *i; + _MultiCommandJob *j = new _MultiCommandJob(cmd, d); + jobs.push_back( shared_ptr<BackgroundJob>(j) ); + j->go(); + } + + for( list< shared_ptr<BackgroundJob> >::iterator i = jobs.begin(); i != jobs.end(); i++ ) { + (*i)->wait(); + } + } +} diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp new file mode 100644 index 00000000000..84f16e53466 --- /dev/null +++ b/src/mongo/db/repl/replset_commands.cpp @@ -0,0 +1,404 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../cmdline.h" +#include "../commands.h" +#include "../repl.h" +#include "health.h" +#include "rs.h" +#include "rs_config.h" +#include "../dbwebserver.h" +#include "../../util/mongoutils/html.h" +#include "../../client/dbclient.h" +#include "../repl_block.h" + +using namespace bson; + +namespace mongo { + + void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial); + + /* commands in other files: + replSetHeartbeat - health.cpp + replSetInitiate - rs_mod.cpp + */ + + bool replSetBlind = false; + unsigned replSetForceInitialSyncFailure = 0; + + class CmdReplSetTest : public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "Just for regression tests.\n"; + } + CmdReplSetTest() : ReplSetCommand("replSetTest") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog; + + if (!checkAuth(errmsg, result)) { + return false; + } + + if( cmdObj.hasElement("forceInitialSyncFailure") ) { + replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number(); + return true; + } + + if( !check(errmsg, result) ) + return false; + + if( cmdObj.hasElement("blind") ) { + replSetBlind = cmdObj.getBoolField("blind"); + return true; + } + + if (cmdObj.hasElement("sethbmsg")) { + replset::sethbmsg(cmdObj["sethbmsg"].String()); + return true; + } + + return false; + } + } cmdReplSetTest; + + /** get rollback id. used to check if a rollback happened during some interval of time. + as consumed, the rollback id is not in any particular order, it simply changes on each rollback. + @see incRBID() + */ + class CmdReplSetGetRBID : public ReplSetCommand { + public: + /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */ + int rbid; + virtual void help( stringstream &help ) const { + help << "internal"; + } + CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") { + // this is ok but micros or combo with some rand() and/or 64 bits might be better -- + // imagine a restart and a clock correction simultaneously (very unlikely but possible...) + rbid = (int) curTimeMillis64(); + } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + result.append("rbid",rbid); + return true; + } + } cmdReplSetRBID; + + /** we increment the rollback id on every rollback event. */ + void incRBID() { + cmdReplSetRBID.rbid++; + } + + /** helper to get rollback id from another server. */ + int getRBID(DBClientConnection *c) { + bo info; + c->simpleCommand("admin", &info, "replSetGetRBID"); + return info["rbid"].numberInt(); + } + + class CmdReplSetGetStatus : public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "Report status of a replica set from the POV of this server\n"; + help << "{ replSetGetStatus : 1 }"; + help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if ( cmdObj["forShell"].trueValue() ) + lastError.disableForCommand(); + + if( !check(errmsg, result) ) + return false; + theReplSet->summarizeStatus(result); + return true; + } + } cmdReplSetGetStatus; + + class CmdReplSetReconfig : public ReplSetCommand { + RWLock mutex; /* we don't need rw but we wanted try capability. :-( */ + public: + virtual void help( stringstream &help ) const { + help << "Adjust configuration of a replica set\n"; + help << "{ replSetReconfig : config_object }"; + help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { } + virtual bool run(const string& a, BSONObj& b, int e, string& errmsg, BSONObjBuilder& c, bool d) { + try { + rwlock_try_write lk(mutex); + return _run(a,b,e,errmsg,c,d); + } + catch(rwlock_try_write::exception&) { } + errmsg = "a replSetReconfig is already in progress"; + return false; + } + private: + bool _run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if ( !checkAuth(errmsg, result) ) { + return false; + } + + if( cmdObj["replSetReconfig"].type() != Object ) { + errmsg = "no configuration specified"; + return false; + } + + bool force = cmdObj.hasField("force") && cmdObj["force"].trueValue(); + if( force && !theReplSet ) { + replSettings.reconfig = cmdObj["replSetReconfig"].Obj().getOwned(); + result.append("msg", "will try this config momentarily, try running rs.conf() again in a few seconds"); + return true; + } + + if ( !check(errmsg, result) ) { + return false; + } + + if( !force && !theReplSet->box.getState().primary() ) { + errmsg = "replSetReconfig command must be sent to the current replica set primary."; + return false; + } + + { + // just make sure we can get a write lock before doing anything else. we'll reacquire one + // later. of course it could be stuck then, but this check lowers the risk if weird things + // are up - we probably don't want a change to apply 30 minutes after the initial attempt. + time_t t = time(0); + writelock lk(""); + if( time(0)-t > 20 ) { + errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; + return false; + } + } + + try { + ReplSetConfig newConfig(cmdObj["replSetReconfig"].Obj(), force); + + log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; + + if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) { + return false; + } + + checkMembersUpForConfigChange(newConfig, result, false); + + log() << "replSet replSetReconfig [2]" << rsLog; + + theReplSet->haveNewConfig(newConfig, true); + ReplSet::startupStatusMsg.set("replSetReconfig'd"); + } + catch( DBException& e ) { + log() << "replSet replSetReconfig exception: " << e.what() << rsLog; + throw; + } + catch( string& se ) { + log() << "replSet reconfig exception: " << se << rsLog; + errmsg = se; + return false; + } + + resetSlaveCache(); + return true; + } + } cmdReplSetReconfig; + + class CmdReplSetFreeze : public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "{ replSetFreeze : <seconds> }"; + help << "'freeze' state of member to the extent we can do that. What this really means is that\n"; + help << "this node will not attempt to become primary until the time period specified expires.\n"; + help << "You can call again with {replSetFreeze:0} to unfreeze sooner.\n"; + help << "A process restart unfreezes the member also.\n"; + help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + + CmdReplSetFreeze() : ReplSetCommand("replSetFreeze") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + int secs = (int) cmdObj.firstElement().numberInt(); + if( theReplSet->freeze(secs) ) { + if( secs == 0 ) + result.append("info","unfreezing"); + } + if( secs == 1 ) + result.append("warning", "you really want to freeze for only 1 second?"); + return true; + } + } cmdReplSetFreeze; + + class CmdReplSetStepDown: public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "{ replSetStepDown : <seconds> }\n"; + help << "Step down as primary. Will not try to reelect self for the specified time period (1 minute if no numeric secs value specified).\n"; + help << "(If another member with same priority takes over in the meantime, it will stay primary.)\n"; + help << "http://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + + CmdReplSetStepDown() : ReplSetCommand("replSetStepDown") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + if( !theReplSet->box.getState().primary() ) { + errmsg = "not primary so can't step down"; + return false; + } + + bool force = cmdObj.hasField("force") && cmdObj["force"].trueValue(); + + // only step down if there is another node synced to within 10 + // seconds of this node + if (!force) { + long long int lastOp = (long long int)theReplSet->lastOpTimeWritten.getSecs(); + long long int closest = (long long int)theReplSet->lastOtherOpTime().getSecs(); + + long long int diff = lastOp - closest; + result.append("closest", closest); + result.append("difference", diff); + + if (diff < 0) { + // not our problem, but we'll wait until thing settle down + errmsg = "someone is ahead of the primary?"; + return false; + } + + if (diff > 10) { + errmsg = "no secondaries within 10 seconds of my optime"; + return false; + } + } + + int secs = (int) cmdObj.firstElement().numberInt(); + if( secs == 0 ) + secs = 60; + return theReplSet->stepDown(secs); + } + } cmdReplSetStepDown; + + class CmdReplSetMaintenance: public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "{ replSetMaintenance : bool }\n"; + help << "Enable or disable maintenance mode."; + } + + CmdReplSetMaintenance() : ReplSetCommand("replSetMaintenance") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + if( theReplSet->box.getState().primary() ) { + errmsg = "primaries can't modify maintenance mode"; + return false; + } + + theReplSet->setMaintenanceMode(cmdObj["replSetMaintenance"].trueValue()); + return true; + } + } cmdReplSetMaintenance; + + using namespace bson; + using namespace mongoutils::html; + extern void fillRsLog(stringstream&); + + class ReplSetHandler : public DbWebHandler { + public: + ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ) {} + + virtual bool handles( const string& url ) const { + return startsWith( url , "/_replSet" ); + } + + virtual void handle( const char *rq, string url, BSONObj params, + string& responseMsg, int& responseCode, + vector<string>& headers, const SockAddr &from ) { + + if( url == "/_replSetOplog" ) { + responseMsg = _replSetOplog(params); + } + else + responseMsg = _replSet(); + responseCode = 200; + } + + string _replSetOplog(bo parms) { + int _id = (int) str::toUnsigned( parms["_id"].String() ); + + stringstream s; + string t = "Replication oplog"; + s << start(t); + s << p(t); + + if( theReplSet == 0 ) { + if( cmdLine._replSet.empty() ) + s << p("Not using --replSet"); + else { + s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + + ".<br>" + ReplSet::startupStatusMsg.get()); + } + } + else { + try { + theReplSet->getOplogDiagsAsHtml(_id, s); + } + catch(std::exception& e) { + s << "error querying oplog: " << e.what() << '\n'; + } + } + + s << _end(); + return s.str(); + } + + /* /_replSet show replica set status in html format */ + string _replSet() { + stringstream s; + s << start("Replica Set Status " + prettyHostName()); + s << p( a("/", "back", "Home") + " | " + + a("/local/system.replset/?html=1", "", "View Replset Config") + " | " + + a("/replSetGetStatus?text=1", "", "replSetGetStatus") + " | " + + a("http://www.mongodb.org/display/DOCS/Replica+Sets", "", "Docs") + ); + + if( theReplSet == 0 ) { + if( cmdLine._replSet.empty() ) + s << p("Not using --replSet"); + else { + s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + + ".<br>" + ReplSet::startupStatusMsg.get()); + } + } + else { + try { + theReplSet->summarizeAsHtml(s); + } + catch(...) { s << "error summarizing replset status\n"; } + } + s << p("Recent replset log activity:"); + fillRsLog(s); + s << _end(); + return s.str(); + } + + + + } replSetHandler; + +} diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp new file mode 100644 index 00000000000..fff5d72bcc0 --- /dev/null +++ b/src/mongo/db/repl/rs.cpp @@ -0,0 +1,778 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../cmdline.h" +#include "../../util/net/sock.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "../dbhelpers.h" +#include "../../s/d_logic.h" +#include "rs.h" +#include "connections.h" +#include "../repl.h" +#include "../instance.h" + +using namespace std; + +namespace mongo { + + using namespace bson; + + bool replSet = false; + ReplSet *theReplSet = 0; + + bool isCurrentlyAReplSetPrimary() { + return theReplSet && theReplSet->isPrimary(); + } + + void replset::sethbmsg(const string& s, const int level) { + if (theReplSet) { + theReplSet->sethbmsg(s, logLevel); + } + } + + void ReplSetImpl::sethbmsg(string s, int logLevel) { + static time_t lastLogged; + _hbmsgTime = time(0); + + if( s == _hbmsg ) { + // unchanged + if( _hbmsgTime - lastLogged < 60 ) + return; + } + + unsigned sz = s.size(); + if( sz >= 256 ) + memcpy(_hbmsg, s.c_str(), 255); + else { + _hbmsg[sz] = 0; + memcpy(_hbmsg, s.c_str(), sz); + } + if( !s.empty() ) { + lastLogged = _hbmsgTime; + log(logLevel) << "replSet " << s << rsLog; + } + } + + void ReplSetImpl::assumePrimary() { + LOG(2) << "replSet assuming primary" << endl; + assert( iAmPotentiallyHot() ); + writelock lk("admin."); // so we are synchronized with _logOp() + + // Make sure that new OpTimes are higher than existing ones even with clock skew + DBDirectClient c; + BSONObj lastOp = c.findOne( "local.oplog.rs", Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk ); + if ( !lastOp.isEmpty() ) { + OpTime::setLast( lastOp[ "ts" ].date() ); + } + + changeState(MemberState::RS_PRIMARY); + } + + void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } + + void ReplSetImpl::setMaintenanceMode(const bool inc) { + lock lk(this); + + if (inc) { + log() << "replSet going into maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; + + _maintenanceMode++; + changeState(MemberState::RS_RECOVERING); + } + else { + _maintenanceMode--; + // no need to change state, syncTail will try to go live as a secondary soon + + log() << "leaving maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; + } + } + + Member* ReplSetImpl::getMostElectable() { + lock lk(this); + + Member *max = 0; + + for (set<unsigned>::iterator it = _electableSet.begin(); it != _electableSet.end(); it++) { + const Member *temp = findById(*it); + if (!temp) { + log() << "couldn't find member: " << *it << endl; + _electableSet.erase(*it); + continue; + } + if (!max || max->config().priority < temp->config().priority) { + max = (Member*)temp; + } + } + + return max; + } + + const bool closeOnRelinquish = true; + + void ReplSetImpl::relinquish() { + LOG(2) << "replSet attempting to relinquish" << endl; + if( box.getState().primary() ) { + { + writelock lk("admin."); // so we are synchronized with _logOp() + + log() << "replSet relinquishing primary state" << rsLog; + changeState(MemberState::RS_SECONDARY); + } + + if( closeOnRelinquish ) { + /* close sockets that were talking to us so they don't blithly send many writes that will fail + with "not master" (of course client could check result code, but in case they are not) + */ + log() << "replSet closing client sockets after reqlinquishing primary" << rsLog; + MessagingPort::closeAllSockets(1); + } + + // now that all connections were closed, strip this mongod from all sharding details + // if and when it gets promoted to a primary again, only then it should reload the sharding state + // the rationale here is that this mongod won't bring stale state when it regains primaryhood + shardingState.resetShardingState(); + + } + else if( box.getState().startup2() ) { + // ? add comment + changeState(MemberState::RS_RECOVERING); + } + } + + /* look freshly for who is primary - includes relinquishing ourself. */ + void ReplSetImpl::forgetPrimary() { + if( box.getState().primary() ) + relinquish(); + else { + box.setOtherPrimary(0); + } + } + + // for the replSetStepDown command + bool ReplSetImpl::_stepDown(int secs) { + lock lk(this); + if( box.getState().primary() ) { + elect.steppedDown = time(0) + secs; + log() << "replSet info stepping down as primary secs=" << secs << rsLog; + relinquish(); + return true; + } + return false; + } + + bool ReplSetImpl::_freeze(int secs) { + lock lk(this); + /* note if we are primary we remain primary but won't try to elect ourself again until + this time period expires. + */ + if( secs == 0 ) { + elect.steppedDown = 0; + log() << "replSet info 'unfreezing'" << rsLog; + } + else { + if( !box.getState().primary() ) { + elect.steppedDown = time(0) + secs; + log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog; + } + else { + log() << "replSet info received freeze command but we are primary" << rsLog; + } + } + return true; + } + + void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { + for( Member *m = _members.head(); m; m=m->next() ) { + if( m->id() == h.id() ) { + m->_hbinfo = h; + return; + } + } + } + + list<HostAndPort> ReplSetImpl::memberHostnames() const { + list<HostAndPort> L; + L.push_back(_self->h()); + for( Member *m = _members.head(); m; m = m->next() ) + L.push_back(m->h()); + return L; + } + + void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) { + assert( m ); + if( m->config().hidden ) + return; + + if( m->potentiallyHot() ) { + hosts.push_back(m->h().toString()); + } + else if( !m->config().arbiterOnly ) { + if( m->config().slaveDelay ) { + /* hmmm - we don't list these as they are stale. */ + } + else { + passives.push_back(m->h().toString()); + } + } + else { + arbiters.push_back(m->h().toString()); + } + } + + void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) { + lock lk(this); + + const StateBox::SP sp = box.get(); + bool isp = sp.state.primary(); + b.append("setName", name()); + b.append("ismaster", isp); + b.append("secondary", sp.state.secondary()); + { + vector<string> hosts, passives, arbiters; + _fillIsMasterHost(_self, hosts, passives, arbiters); + + for( Member *m = _members.head(); m; m = m->next() ) { + assert( m ); + _fillIsMasterHost(m, hosts, passives, arbiters); + } + + if( hosts.size() > 0 ) { + b.append("hosts", hosts); + } + if( passives.size() > 0 ) { + b.append("passives", passives); + } + if( arbiters.size() > 0 ) { + b.append("arbiters", arbiters); + } + } + + if( !isp ) { + const Member *m = sp.primary; + if( m ) + b.append("primary", m->h().toString()); + } + else { + b.append("primary", _self->fullName()); + } + + if( myConfig().arbiterOnly ) + b.append("arbiterOnly", true); + if( myConfig().priority == 0 && !myConfig().arbiterOnly) + b.append("passive", true); + if( myConfig().slaveDelay ) + b.append("slaveDelay", myConfig().slaveDelay); + if( myConfig().hidden ) + b.append("hidden", true); + if( !myConfig().buildIndexes ) + b.append("buildIndexes", false); + if( !myConfig().tags.empty() ) { + BSONObjBuilder a; + for( map<string,string>::const_iterator i = myConfig().tags.begin(); i != myConfig().tags.end(); i++ ) + a.append((*i).first, (*i).second); + b.append("tags", a.done()); + } + b.append("me", myConfig().h.toString()); + } + + /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ + + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) { + const char *p = cfgString.c_str(); + const char *slash = strchr(p, '/'); + if( slash ) + setname = string(p, slash-p); + else + setname = p; + uassert(13093, "bad --replSet config string format is: <setname>[/<seedhost1>,<seedhost2>,...]", !setname.empty()); + + if( slash == 0 ) + return; + + p = slash + 1; + while( 1 ) { + const char *comma = strchr(p, ','); + if( comma == 0 ) comma = strchr(p,0); + if( p == comma ) + break; + { + HostAndPort m; + try { + m = HostAndPort( string(p, comma-p) ); + } + catch(...) { + uassert(13114, "bad --replSet seed hostname", false); + } + uassert(13096, "bad --replSet command line config string - dups?", seedSet.count(m) == 0 ); + seedSet.insert(m); + //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost()); + if( m.isSelf() ) { + log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog; + } + else + seeds.push_back(m); + if( *comma == 0 ) + break; + p = comma + 1; + } + } + } + + ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), + _currentSyncTarget(0), + _blockSync(false), + _hbmsgTime(0), + _self(0), + _maintenanceMode(0), + mgr( new Manager(this) ), + ghost( new GhostSync(this) ) { + + _cfg = 0; + memset(_hbmsg, 0, sizeof(_hbmsg)); + strcpy( _hbmsg , "initial startup" ); + lastH = 0; + changeState(MemberState::RS_STARTUP); + + _seeds = &replSetCmdline.seeds; + + LOG(1) << "replSet beginning startup..." << rsLog; + + loadConfig(); + + unsigned sss = replSetCmdline.seedSet.size(); + for( Member *m = head(); m; m = m->next() ) { + replSetCmdline.seedSet.erase(m->h()); + } + for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { + if( i->isSelf() ) { + if( sss == 1 ) { + LOG(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; + } + } + else { + log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; + } + } + } + + void newReplUp(); + + void ReplSetImpl::loadLastOpTimeWritten(bool quiet) { + readlock lk(rsoplog); + BSONObj o; + if( Helpers::getLast(rsoplog, o) ) { + lastH = o["h"].numberLong(); + lastOpTimeWritten = o["ts"]._opTime(); + uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull()); + } + } + + /* call after constructing to start - returns fairly quickly after launching its threads */ + void ReplSetImpl::_go() { + try { + loadLastOpTimeWritten(); + } + catch(std::exception& e) { + log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; + log() << e.what() << rsLog; + sleepsecs(30); + dbexit( EXIT_REPLICATION_ERROR ); + return; + } + + changeState(MemberState::RS_STARTUP2); + startThreads(); + newReplUp(); // oplog.cpp + } + + ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; + DiagStr ReplSetImpl::startupStatusMsg; + + extern BSONObj *getLastErrorDefault; + + void ReplSetImpl::setSelfTo(Member *m) { + // already locked in initFromConfig + _self = m; + _id = m->id(); + _config = m->config(); + if( m ) _buildIndexes = m->config().buildIndexes; + else _buildIndexes = true; + } + + /** @param reconf true if this is a reconfiguration and not an initial load of the configuration. + @return true if ok; throws if config really bad; false if config doesn't include self + */ + bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { + /* NOTE: haveNewConfig() writes the new config to disk before we get here. So + we cannot error out at this point, except fatally. Check errors earlier. + */ + lock lk(this); + + if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) { + // see comment in dbcommands.cpp for getlasterrordefault + getLastErrorDefault = new BSONObj( c.getLastErrorDefaults ); + } + + list<ReplSetConfig::MemberCfg*> newOnes; + // additive short-cuts the new config setup. If we are just adding a + // node/nodes and nothing else is changing, this is additive. If it's + // not a reconfig, we're not adding anything + bool additive = reconf; + { + unsigned nfound = 0; + int me = 0; + for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { + + ReplSetConfig::MemberCfg& m = *i; + if( m.h.isSelf() ) { + me++; + } + + if( reconf ) { + if (m.h.isSelf() && (!_self || (int)_self->id() != m._id)) { + log() << "self doesn't match: " << m._id << rsLog; + assert(false); + } + + const Member *old = findById(m._id); + if( old ) { + nfound++; + assert( (int) old->id() == m._id ); + if( old->config() != m ) { + additive = false; + } + } + else { + newOnes.push_back(&m); + } + } + } + if( me == 0 ) { + _members.orphanAll(); + + // sending hbs must continue to pick up new config, so we leave + // hb threads alone + + // close sockets to force clients to re-evaluate this member + MessagingPort::closeAllSockets(0); + + // stop sync thread + box.set(MemberState::RS_STARTUP, 0); + + // go into holding pattern + log() << "replSet error self not present in the repl set configuration:" << rsLog; + log() << c.toString() << rsLog; + return false; + } + uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); + + // if we found different members that the original config, reload everything + if( reconf && config().members.size() != nfound ) + additive = false; + } + + _cfg = new ReplSetConfig(c); + assert( _cfg->ok() ); + assert( _name.empty() || _name == _cfg->_id ); + _name = _cfg->_id; + assert( !_name.empty() ); + + // this is a shortcut for simple changes + if( additive ) { + log() << "replSet info : additive change to configuration" << rsLog; + for( list<ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { + ReplSetConfig::MemberCfg *m = *i; + Member *mi = new Member(m->h, m->_id, m, false); + + /** we will indicate that new members are up() initially so that we don't relinquish our + primary state because we can't (transiently) see a majority. they should be up as we + check that new members are up before getting here on reconfig anyway. + */ + mi->get_hbinfo().health = 0.1; + + _members.push(mi); + startHealthTaskFor(mi); + } + + // if we aren't creating new members, we may have to update the + // groups for the current ones + _cfg->updateMembers(_members); + + return true; + } + + // start with no members. if this is a reconfig, drop the old ones. + _members.orphanAll(); + + endOldHealthTasks(); + + int oldPrimaryId = -1; + { + const Member *p = box.getPrimary(); + if( p ) + oldPrimaryId = p->id(); + } + forgetPrimary(); + + // not setting _self to 0 as other threads use _self w/o locking + int me = 0; + + // For logging + string members = ""; + + for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { + ReplSetConfig::MemberCfg& m = *i; + Member *mi; + members += ( members == "" ? "" : ", " ) + m.h.toString(); + if( m.h.isSelf() ) { + assert( me++ == 0 ); + mi = new Member(m.h, m._id, &m, true); + if (!reconf) { + log() << "replSet I am " << m.h.toString() << rsLog; + } + setSelfTo(mi); + + if( (int)mi->id() == oldPrimaryId ) + box.setSelfPrimary(mi); + } + else { + mi = new Member(m.h, m._id, &m, false); + _members.push(mi); + startHealthTaskFor(mi); + if( (int)mi->id() == oldPrimaryId ) + box.setOtherPrimary(mi); + } + } + + if( me == 0 ){ + log() << "replSet warning did not detect own host in full reconfig, members " << members << " config: " << c << rsLog; + } + + return true; + } + + // Our own config must be the first one. + bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) { + int v = -1; + ReplSetConfig *highest = 0; + int myVersion = -2000; + int n = 0; + for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { + ReplSetConfig& cfg = *i; + if( ++n == 1 ) myVersion = cfg.version; + if( cfg.ok() && cfg.version > v ) { + highest = &cfg; + v = cfg.version; + } + } + assert( highest ); + + if( !initFromConfig(*highest) ) + return false; + + if( highest->version > myVersion && highest->version >= 0 ) { + log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; + highest->saveConfigLocally(BSONObj()); + } + return true; + } + + void ReplSetImpl::loadConfig() { + while( 1 ) { + startupStatus = LOADINGCONFIG; + startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)"); + LOG(1) << "loadConfig() " << rsConfigNs << endl; + try { + vector<ReplSetConfig> configs; + try { + configs.push_back( ReplSetConfig(HostAndPort::me()) ); + } + catch(DBException& e) { + log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog; + } + for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { + try { + configs.push_back( ReplSetConfig(*i) ); + } + catch( DBException& e ) { + log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog; + } + } + { + scoped_lock lck( replSettings.discoveredSeeds_mx ); + if( replSettings.discoveredSeeds.size() > 0 ) { + for (set<string>::iterator i = replSettings.discoveredSeeds.begin(); + i != replSettings.discoveredSeeds.end(); + i++) { + try { + configs.push_back( ReplSetConfig(HostAndPort(*i)) ); + } + catch( DBException& ) { + log(1) << "replSet exception trying to load config from discovered seed " << *i << rsLog; + replSettings.discoveredSeeds.erase(*i); + } + } + } + } + + if (!replSettings.reconfig.isEmpty()) { + try { + configs.push_back(ReplSetConfig(replSettings.reconfig, true)); + } + catch( DBException& re) { + log() << "replSet couldn't load reconfig: " << re.what() << rsLog; + replSettings.reconfig = BSONObj(); + } + } + + int nok = 0; + int nempty = 0; + for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) { + if( i->ok() ) + nok++; + if( i->empty() ) + nempty++; + } + if( nok == 0 ) { + + if( nempty == (int) configs.size() ) { + startupStatus = EMPTYCONFIG; + startupStatusMsg.set("can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"); + log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; + static unsigned once; + if( ++once == 1 ) { + log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog; + } + if( _seeds->size() == 0 ) { + LOG(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; + } + } + else { + startupStatus = EMPTYUNREACHABLE; + startupStatusMsg.set("can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"); + log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog; + } + + sleepsecs(10); + continue; + } + + if( !_loadConfigFinish(configs) ) { + log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog; + sleepsecs(20); + continue; + } + } + catch(DBException& e) { + startupStatus = BADCONFIG; + startupStatusMsg.set("replSet error loading set config (BADCONFIG)"); + log() << "replSet error loading configurations " << e.toString() << rsLog; + log() << "replSet error replication will not start" << rsLog; + sethbmsg("error loading set config"); + _fatal(); + throw; + } + break; + } + startupStatusMsg.set("? started"); + startupStatus = STARTED; + } + + void ReplSetImpl::_fatal() { + box.set(MemberState::RS_FATAL, 0); + log() << "replSet error fatal, stopping replication" << rsLog; + } + + void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { + bo comment; + if( addComment ) + comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); + + newConfig.saveConfigLocally(comment); + + try { + if (initFromConfig(newConfig, true)) { + log() << "replSet replSetReconfig new config saved locally" << rsLog; + } + } + catch(DBException& e) { + if( e.getCode() == 13497 /* removed from set */ ) { + cc().shutdown(); + dbexit( EXIT_CLEAN , "removed from replica set" ); // never returns + assert(0); + } + log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog; + _fatal(); + } + catch(...) { + log() << "replSet error unexpected exception in haveNewConfig()" << rsLog; + _fatal(); + } + } + + void Manager::msgReceivedNewConfig(BSONObj o) { + log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog; + ReplSetConfig c(o); + if( c.version > rs->config().version ) + theReplSet->haveNewConfig(c, false); + else { + log() << "replSet info msgReceivedNewConfig but version isn't higher " << + c.version << ' ' << rs->config().version << rsLog; + } + } + + /* forked as a thread during startup + it can run quite a while looking for config. but once found, + a separate thread takes over as ReplSetImpl::Manager, and this thread + terminates. + */ + void startReplSets(ReplSetCmdline *replSetCmdline) { + Client::initThread("rsStart"); + try { + assert( theReplSet == 0 ); + if( replSetCmdline == 0 ) { + assert(!replSet); + return; + } + replLocalAuth(); + (theReplSet = new ReplSet(*replSetCmdline))->go(); + } + catch(std::exception& e) { + log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; + if( theReplSet ) + theReplSet->fatal(); + } + cc().shutdown(); + } + + void replLocalAuth() { + if ( noauth ) + return; + cc().getAuthenticationInfo()->authorize("local","_repl"); + } + + +} + +namespace boost { + + void assertion_failed(char const * expr, char const * function, char const * file, long line) { + mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl; + } + +} diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h new file mode 100644 index 00000000000..8e43204be3b --- /dev/null +++ b/src/mongo/db/repl/rs.h @@ -0,0 +1,667 @@ +// /db/repl/rs.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../../util/concurrency/list.h" +#include "../../util/concurrency/value.h" +#include "../../util/concurrency/msg.h" +#include "../../util/net/hostandport.h" +#include "../commands.h" +#include "../oplog.h" +#include "../oplogreader.h" +#include "rs_exception.h" +#include "rs_optime.h" +#include "rs_member.h" +#include "rs_config.h" + +/** + * Order of Events + * + * On startup, if the --replSet option is present, startReplSets is called. + * startReplSets forks off a new thread for replica set activities. It creates + * the global theReplSet variable and calls go() on it. + * + * theReplSet's constructor changes the replica set's state to RS_STARTUP, + * starts the replica set manager, and loads the config (if the replica set + * has been initialized). + */ + +namespace mongo { + + struct HowToFixUp; + struct Target; + class DBClientConnection; + class ReplSetImpl; + class OplogReader; + extern bool replSet; // true if using repl sets + extern class ReplSet *theReplSet; // null until initialized + extern Tee *rsLog; + + /* member of a replica set */ + class Member : public List1<Member>::Base { + private: + ~Member(); // intentionally unimplemented as should never be called -- see List1<>::Base. + Member(const Member&); + public: + Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self); + + string fullName() const { return h().toString(); } + const ReplSetConfig::MemberCfg& config() const { return _config; } + ReplSetConfig::MemberCfg& configw() { return _config; } + const HeartbeatInfo& hbinfo() const { return _hbinfo; } + HeartbeatInfo& get_hbinfo() { return _hbinfo; } + string lhb() const { return _hbinfo.lastHeartbeatMsg; } + MemberState state() const { return _hbinfo.hbstate; } + const HostAndPort& h() const { return _h; } + unsigned id() const { return _hbinfo.id(); } + + bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0 + void summarizeMember(stringstream& s) const; + + private: + friend class ReplSetImpl; + ReplSetConfig::MemberCfg _config; + const HostAndPort _h; + HeartbeatInfo _hbinfo; + }; + + namespace replset { + /** + * "Normal" replica set syncing + */ + class SyncTail : public Sync { + public: + virtual ~SyncTail() {} + SyncTail(const string& host) : Sync(host) {} + virtual bool syncApply(const BSONObj &o); + }; + + /** + * Initial clone and sync + */ + class InitialSync : public SyncTail { + public: + InitialSync(const string& host) : SyncTail(host) {} + virtual ~InitialSync() {} + bool oplogApplication(OplogReader& r, const Member* source, const OpTime& applyGTE, const OpTime& minValid); + virtual void applyOp(const BSONObj& o, const OpTime& minvalid); + }; + + // TODO: move hbmsg into an error-keeping class (SERVER-4444) + void sethbmsg(const string& s, const int logLevel=0); + + } // namespace replset + + class Manager : public task::Server { + ReplSetImpl *rs; + bool busyWithElectSelf; + int _primary; + + /** @param two - if true two primaries were seen. this can happen transiently, in addition to our + polling being only occasional. in this case null is returned, but the caller should + not assume primary itself in that situation. + */ + const Member* findOtherPrimary(bool& two); + + void noteARemoteIsPrimary(const Member *); + void checkElectableSet(); + void checkAuth(); + virtual void starting(); + public: + Manager(ReplSetImpl *rs); + virtual ~Manager(); + void msgReceivedNewConfig(BSONObj); + void msgCheckNewState(); + }; + + class GhostSync : public task::Server { + struct GhostSlave : boost::noncopyable { + GhostSlave() : last(0), slave(0), init(false) { } + OplogReader reader; + OpTime last; + Member* slave; + bool init; + }; + /** + * This is a cache of ghost slaves + */ + typedef map< mongo::OID,shared_ptr<GhostSlave> > MAP; + MAP _ghostCache; + RWLock _lock; // protects _ghostCache + ReplSetImpl *rs; + virtual void starting(); + public: + GhostSync(ReplSetImpl *_rs) : task::Server("rsGhostSync"), _lock("GhostSync"), rs(_rs) {} + ~GhostSync() { + log() << "~GhostSync() called" << rsLog; + } + + /** + * Replica sets can sync in a hierarchical fashion, which throws off w + * calculation on the master. percolate() faux-syncs from an upstream + * node so that the primary will know what the slaves are up to. + * + * We can't just directly sync to the primary because it could be + * unreachable, e.g., S1--->S2--->S3--->P. S2 should ghost sync from S3 + * and S3 can ghost sync from the primary. + * + * Say we have an S1--->S2--->P situation and this node is S2. rid + * would refer to S1. S2 would create a ghost slave of S1 and connect + * it to P (_currentSyncTarget). Then it would use this connection to + * pretend to be S1, replicating off of P. + */ + void percolate(const BSONObj& rid, const OpTime& last); + void associateSlave(const BSONObj& rid, const int memberId); + void updateSlave(const mongo::OID& id, const OpTime& last); + }; + + struct Target; + + class Consensus { + ReplSetImpl &rs; + struct LastYea { + LastYea() : when(0), who(0xffffffff) { } + time_t when; + unsigned who; + }; + static SimpleMutex lyMutex; + Guarded<LastYea,lyMutex> ly; + unsigned yea(unsigned memberId); // throws VoteException + void electionFailed(unsigned meid); + void _electSelf(); + bool weAreFreshest(bool& allUp, int& nTies); + bool sleptLast; // slept last elect() pass + public: + Consensus(ReplSetImpl *t) : rs(*t) { + sleptLast = false; + steppedDown = 0; + } + + /* if we've stepped down, this is when we are allowed to try to elect ourself again. + todo: handle possible weirdnesses at clock skews etc. + */ + time_t steppedDown; + + int totalVotes() const; + bool aMajoritySeemsToBeUp() const; + bool shouldRelinquish() const; + void electSelf(); + void electCmdReceived(BSONObj, BSONObjBuilder*); + void multiCommand(BSONObj cmd, list<Target>& L); + }; + + /** + * most operations on a ReplSet object should be done while locked. that + * logic implemented here. + * + * Order of locking: lock the replica set, then take a rwlock. + */ + class RSBase : boost::noncopyable { + public: + const unsigned magic; + void assertValid() { assert( magic == 0x12345677 ); } + private: + mongo::mutex m; + int _locked; + ThreadLocalValue<bool> _lockedByMe; + protected: + RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } + ~RSBase() { + /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */ + log() << "replSet ~RSBase called" << rsLog; + } + + public: + class lock { + RSBase& rsbase; + auto_ptr<scoped_lock> sl; + public: + lock(RSBase* b) : rsbase(*b) { + if( rsbase._lockedByMe.get() ) + return; // recursive is ok... + + sl.reset( new scoped_lock(rsbase.m) ); + DEV assert(rsbase._locked == 0); + rsbase._locked++; + rsbase._lockedByMe.set(true); + } + ~lock() { + if( sl.get() ) { + assert( rsbase._lockedByMe.get() ); + DEV assert(rsbase._locked == 1); + rsbase._lockedByMe.set(false); + rsbase._locked--; + } + } + }; + + /* for asserts */ + bool locked() const { return _locked != 0; } + + /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another + just for asserts & such so we can make the contracts clear on who locks what when. + we don't use these locks that frequently, so the little bit of overhead is fine. + */ + bool lockedByMe() { return _lockedByMe.get(); } + }; + + class ReplSetHealthPollTask; + + /* safe container for our state that keeps member pointer and state variables always aligned */ + class StateBox : boost::noncopyable { + public: + struct SP { // SP is like pair<MemberState,const Member *> but nicer + SP() : state(MemberState::RS_STARTUP), primary(0) { } + MemberState state; + const Member *primary; + }; + const SP get() { + rwlock lk(m, false); + return sp; + } + MemberState getState() const { + rwlock lk(m, false); + return sp.state; + } + const Member* getPrimary() const { + rwlock lk(m, false); + return sp.primary; + } + void change(MemberState s, const Member *self) { + rwlock lk(m, true); + if( sp.state != s ) { + log() << "replSet " << s.toString() << rsLog; + } + sp.state = s; + if( s.primary() ) { + sp.primary = self; + } + else { + if( self == sp.primary ) + sp.primary = 0; + } + } + void set(MemberState s, const Member *p) { + rwlock lk(m, true); + sp.state = s; + sp.primary = p; + } + void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); } + void setOtherPrimary(const Member *mem) { + rwlock lk(m, true); + assert( !sp.state.primary() ); + sp.primary = mem; + } + void noteRemoteIsPrimary(const Member *remote) { + rwlock lk(m, true); + if( !sp.state.secondary() && !sp.state.fatal() ) + sp.state = MemberState::RS_RECOVERING; + sp.primary = remote; + } + StateBox() : m("StateBox") { } + private: + RWLock m; + SP sp; + }; + + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ); + + /** Parameter given to the --replSet command line option (parsed). + Syntax is "<setname>/<seedhost1>,<seedhost2>" + where setname is a name and seedhost is "<host>[:<port>]" */ + class ReplSetCmdline { + public: + ReplSetCmdline(string cfgString) { parseReplsetCmdLine(cfgString, setname, seeds, seedSet); } + string setname; + vector<HostAndPort> seeds; + set<HostAndPort> seedSet; + }; + + /* information about the entire repl set, such as the various servers in the set, and their state */ + /* note: We currently do not free mem when the set goes away - it is assumed the replset is a + singleton and long lived. + */ + class ReplSetImpl : protected RSBase { + public: + /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */ + enum StartupStatus { + PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, + EMPTYUNREACHABLE=4, STARTED=5, SOON=6 + }; + static StartupStatus startupStatus; + static DiagStr startupStatusMsg; + static string stateAsHtml(MemberState state); + + /* todo thread */ + void msgUpdateHBInfo(HeartbeatInfo); + + StateBox box; + + OpTime lastOpTimeWritten; + long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork" + private: + set<ReplSetHealthPollTask*> healthTasks; + void endOldHealthTasks(); + void startHealthTaskFor(Member *m); + + Consensus elect; + void relinquish(); + void forgetPrimary(); + protected: + bool _stepDown(int secs); + bool _freeze(int secs); + private: + void assumePrimary(); + void loadLastOpTimeWritten(bool quiet=false); + void changeState(MemberState s); + + /** + * Find the closest member (using ping time) with a higher latest optime. + */ + Member* getMemberToSyncTo(); + void veto(const string& host, unsigned secs=10); + Member* _currentSyncTarget; + + bool _blockSync; + void blockSync(bool block); + + // set of electable members' _ids + set<unsigned> _electableSet; + protected: + // "heartbeat message" + // sent in requestHeartbeat respond in field "hbm" + char _hbmsg[256]; // we change this unlocked, thus not an stl::string + time_t _hbmsgTime; // when it was logged + public: + void sethbmsg(string s, int logLevel = 0); + + /** + * Election with Priorities + * + * Each node (n) keeps a set of nodes that could be elected primary. + * Each node in this set: + * + * 1. can connect to a majority of the set + * 2. has a priority greater than 0 + * 3. has an optime within 10 seconds of the most up-to-date node + * that n can reach + * + * If a node fails to meet one or more of these criteria, it is removed + * from the list. This list is updated whenever the node receives a + * heartbeat. + * + * When a node sends an "am I freshest?" query, the node receiving the + * query checks their electable list to make sure that no one else is + * electable AND higher priority. If this check passes, the node will + * return an "ok" response, if not, it will veto. + * + * If a node is primary and there is another node with higher priority + * on the electable list (i.e., it must be synced to within 10 seconds + * of the current primary), the node (or nodes) with connections to both + * the primary and the secondary with higher priority will issue + * replSetStepDown requests to the primary to allow the higher-priority + * node to take over. + */ + void addToElectable(const unsigned m) { lock lk(this); _electableSet.insert(m); } + void rmFromElectable(const unsigned m) { lock lk(this); _electableSet.erase(m); } + bool iAmElectable() { lock lk(this); return _electableSet.find(_self->id()) != _electableSet.end(); } + bool isElectable(const unsigned id) { lock lk(this); return _electableSet.find(id) != _electableSet.end(); } + Member* getMostElectable(); + protected: + /** + * Load a new config as the replica set's main config. + * + * If there is a "simple" change (just adding a node), this shortcuts + * the config. Returns true if the config was changed. Returns false + * if the config doesn't include a this node. Throws an exception if + * something goes very wrong. + * + * Behavior to note: + * - locks this + * - intentionally leaks the old _cfg and any old _members (if the + * change isn't strictly additive) + */ + bool initFromConfig(ReplSetConfig& c, bool reconf=false); + void _fillIsMaster(BSONObjBuilder&); + void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); + const ReplSetConfig& config() { return *_cfg; } + string name() const { return _name; } /* @return replica set's logical name */ + MemberState state() const { return box.getState(); } + void _fatal(); + void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const; + void _summarizeAsHtml(stringstream&) const; + void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command + + /* throws exception if a problem initializing. */ + ReplSetImpl(ReplSetCmdline&); + + /* call afer constructing to start - returns fairly quickly after launching its threads */ + void _go(); + + private: + string _name; + const vector<HostAndPort> *_seeds; + ReplSetConfig *_cfg; + + /** + * Finds the configuration with the highest version number and attempts + * load it. + */ + bool _loadConfigFinish(vector<ReplSetConfig>& v); + /** + * Gather all possible configs (from command line seeds, our own config + * doc, and any hosts listed therein) and try to initiate from the most + * recent config we find. + */ + void loadConfig(); + + list<HostAndPort> memberHostnames() const; + const ReplSetConfig::MemberCfg& myConfig() const { return _config; } + bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } + bool iAmPotentiallyHot() const { + return myConfig().potentiallyHot() && // not an arbiter + elect.steppedDown <= time(0) && // not stepped down/frozen + state() == MemberState::RS_SECONDARY; // not stale + } + protected: + Member *_self; + bool _buildIndexes; // = _self->config().buildIndexes + void setSelfTo(Member *); // use this as it sets buildIndexes var + private: + List1<Member> _members; // all members of the set EXCEPT _self. + ReplSetConfig::MemberCfg _config; // config of _self + unsigned _id; // _id of _self + + int _maintenanceMode; // if we should stay in recovering state + public: + // this is called from within a writelock in logOpRS + unsigned selfId() const { return _id; } + Manager *mgr; + GhostSync *ghost; + /** + * This forces a secondary to go into recovering state and stay there + * until this is called again, passing in "false". Multiple threads can + * call this and it will leave maintenance mode once all of the callers + * have called it again, passing in false. + */ + void setMaintenanceMode(const bool inc); + private: + Member* head() const { return _members.head(); } + public: + const Member* findById(unsigned id) const; + private: + void _getTargets(list<Target>&, int &configVersion); + void getTargets(list<Target>&, int &configVersion); + void startThreads(); + friend class FeedbackThread; + friend class CmdReplSetElect; + friend class Member; + friend class Manager; + friend class GhostSync; + friend class Consensus; + + private: + bool initialSyncOplogApplication(const OpTime& applyGTE, const OpTime& minValid); + void _syncDoInitialSync(); + void syncDoInitialSync(); + void _syncThread(); + bool tryToGoLiveAsASecondary(OpTime&); // readlocks + void syncTail(); + unsigned _syncRollback(OplogReader& r); + void syncRollback(OplogReader& r); + void syncFixUp(HowToFixUp& h, OplogReader& r); + + // get an oplog reader for a server with an oplog entry timestamp greater + // than or equal to minTS, if set. + Member* _getOplogReader(OplogReader& r, const OpTime& minTS); + + // check lastOpTimeWritten against the remote's earliest op, filling in + // remoteOldestOp. + bool _isStale(OplogReader& r, const OpTime& minTS, BSONObj& remoteOldestOp); + + // keep a list of hosts that we've tried recently that didn't work + map<string,time_t> _veto; + public: + void syncThread(); + const OpTime lastOtherOpTime() const; + }; + + class ReplSet : public ReplSetImpl { + public: + ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { } + + // for the replSetStepDown command + bool stepDown(int secs) { return _stepDown(secs); } + + // for the replSetFreeze command + bool freeze(int secs) { return _freeze(secs); } + + string selfFullName() { + assert( _self ); + return _self->fullName(); + } + + bool buildIndexes() const { return _buildIndexes; } + + /* call after constructing to start - returns fairly quickly after la[unching its threads */ + void go() { _go(); } + + void fatal() { _fatal(); } + bool isPrimary() { return box.getState().primary(); } + bool isSecondary() { return box.getState().secondary(); } + MemberState state() const { return ReplSetImpl::state(); } + string name() const { return ReplSetImpl::name(); } + const ReplSetConfig& config() { return ReplSetImpl::config(); } + void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { _getOplogDiagsAsHtml(server_id,ss); } + void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss); } + void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); } + void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } + + /** + * We have a new config (reconfig) - apply it. + * @param comment write a no-op comment to the oplog about it. only + * makes sense if one is primary and initiating the reconf. + * + * The slaves are updated when they get a heartbeat indicating the new + * config. The comment is a no-op. + */ + void haveNewConfig(ReplSetConfig& c, bool comment); + + /** + * Pointer assignment isn't necessarily atomic, so this needs to assure + * locking, even though we don't delete old configs. + */ + const ReplSetConfig& getConfig() { return config(); } + + bool lockedByMe() { return RSBase::lockedByMe(); } + + // heartbeat msg to send to others; descriptive diagnostic info + string hbmsg() const { + if( time(0)-_hbmsgTime > 120 ) return ""; + return _hbmsg; + } + }; + + /** + * Base class for repl set commands. Checks basic things such if we're in + * rs mode before the command does its real work. + */ + class ReplSetCommand : public Command { + protected: + ReplSetCommand(const char * s, bool show=false) : Command(s, show) { } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return true; } + virtual bool logTheOp() { return false; } + virtual LockType locktype() const { return NONE; } + virtual void help( stringstream &help ) const { help << "internal"; } + + /** + * Some replica set commands call this and then call check(). This is + * intentional, as they might do things before theReplSet is initialized + * that still need to be checked for auth. + */ + bool checkAuth(string& errmsg, BSONObjBuilder& result) { + if( !noauth ) { + AuthenticationInfo *ai = cc().getAuthenticationInfo(); + if (!ai->isAuthorizedForLock("admin", locktype())) { + errmsg = "replSet command unauthorized"; + return false; + } + } + return true; + } + + bool check(string& errmsg, BSONObjBuilder& result) { + if( !replSet ) { + errmsg = "not running with --replSet"; + if( cmdLine.configsvr ) { + result.append("info", "configsvr"); // for shell prompt + } + return false; + } + + if( theReplSet == 0 ) { + result.append("startupStatus", ReplSet::startupStatus); + string s; + errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg.get(); + if( ReplSet::startupStatus == 3 ) + result.append("info", "run rs.initiate(...) if not yet done for the set"); + return false; + } + + return checkAuth(errmsg, result); + } + }; + + /** + * does local authentication + * directly authorizes against AuthenticationInfo + */ + void replLocalAuth(); + + /** inlines ----------------- */ + + inline Member::Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self) : + _config(*c), _h(h), _hbinfo(ord) { + assert(c); + if( self ) + _hbinfo.health = 1.0; + } + +} diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp new file mode 100644 index 00000000000..22137773aec --- /dev/null +++ b/src/mongo/db/repl/rs_config.cpp @@ -0,0 +1,662 @@ +// rs_config.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "../../client/dbclient.h" +#include "../../client/syncclusterconnection.h" +#include "../../util/net/hostandport.h" +#include "../dbhelpers.h" +#include "connections.h" +#include "../oplog.h" +#include "../instance.h" +#include "../../util/text.h" +#include <boost/algorithm/string.hpp> + +using namespace bson; + +namespace mongo { + + void logOpInitiate(const bo&); + + void assertOnlyHas(BSONObj o, const set<string>& fields) { + BSONObj::iterator i(o); + while( i.more() ) { + BSONElement e = i.next(); + if( !fields.count( e.fieldName() ) ) { + uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "' in object"); + } + } + } + + list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { + list<HostAndPort> L; + for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) { + if( !i->h.isSelf() ) + L.push_back(i->h); + } + return L; + } + + /* comment MUST only be set when initiating the set by the initiator */ + void ReplSetConfig::saveConfigLocally(bo comment) { + checkRsConfig(); + log() << "replSet info saving a newer config version to local.system.replset" << rsLog; + { + writelock lk(""); + Client::Context cx( rsConfigNs ); + cx.db()->flushFiles(true); + + //theReplSet->lastOpTimeWritten = ??; + //rather than above, do a logOp()? probably + BSONObj o = asBson(); + Helpers::putSingletonGod(rsConfigNs.c_str(), o, false/*logOp=false; local db so would work regardless...*/); + if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) ) + logOpInitiate(comment); + + cx.db()->flushFiles(true); + } + log() << "replSet saveConfigLocally done" << rsLog; + } + + bo ReplSetConfig::MemberCfg::asBson() const { + bob b; + b << "_id" << _id; + b.append("host", h.dynString()); + if( votes != 1 ) b << "votes" << votes; + if( priority != 1.0 ) b << "priority" << priority; + if( arbiterOnly ) b << "arbiterOnly" << true; + if( slaveDelay ) b << "slaveDelay" << slaveDelay; + if( hidden ) b << "hidden" << hidden; + if( !buildIndexes ) b << "buildIndexes" << buildIndexes; + if( !tags.empty() ) { + BSONObjBuilder a; + for( map<string,string>::const_iterator i = tags.begin(); i != tags.end(); i++ ) + a.append((*i).first, (*i).second); + b.append("tags", a.done()); + } + return b.obj(); + } + + void ReplSetConfig::updateMembers(List1<Member> &dest) { + for (vector<MemberCfg>::iterator source = members.begin(); source < members.end(); source++) { + for( Member *d = dest.head(); d; d = d->next() ) { + if (d->fullName() == (*source).h.toString()) { + d->configw().groupsw() = (*source).groups(); + } + } + } + } + + bo ReplSetConfig::asBson() const { + bob b; + b.append("_id", _id).append("version", version); + + BSONArrayBuilder a; + for( unsigned i = 0; i < members.size(); i++ ) + a.append( members[i].asBson() ); + b.append("members", a.arr()); + + if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() || !rules.empty()) { + bob settings; + if( !rules.empty() ) { + bob modes; + for (map<string,TagRule*>::const_iterator it = rules.begin(); it != rules.end(); it++) { + bob clauses; + vector<TagClause*> r = (*it).second->clauses; + for (vector<TagClause*>::iterator it2 = r.begin(); it2 < r.end(); it2++) { + clauses << (*it2)->name << (*it2)->target; + } + modes << (*it).first << clauses.obj(); + } + settings << "getLastErrorModes" << modes.obj(); + } + if( !getLastErrorDefaults.isEmpty() ) + settings << "getLastErrorDefaults" << getLastErrorDefaults; + b << "settings" << settings.obj(); + } + + return b.obj(); + } + + static inline void mchk(bool expr) { + uassert(13126, "bad Member config", expr); + } + + void ReplSetConfig::MemberCfg::check() const { + mchk(_id >= 0 && _id <= 255); + mchk(priority >= 0 && priority <= 1000); + mchk(votes <= 100); // votes >= 0 because it is unsigned + uassert(13419, "priorities must be between 0.0 and 100.0", priority >= 0.0 && priority <= 100.0); + uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0); + uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366); + uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden); + uassert(13477, "priority must be 0 when buildIndexes=false", buildIndexes || priority == 0); + } +/* + string ReplSetConfig::TagSubgroup::toString() const { + bool first = true; + string result = "\""+name+"\": ["; + for (set<const MemberCfg*>::const_iterator i = m.begin(); i != m.end(); i++) { + if (!first) { + result += ", "; + } + first = false; + result += (*i)->h.toString(); + } + return result+"]"; + } + */ + string ReplSetConfig::TagClause::toString() const { + string result = name+": {"; + for (map<string,TagSubgroup*>::const_iterator i = subgroups.begin(); i != subgroups.end(); i++) { +//TEMP? result += (*i).second->toString()+", "; + } + result += "TagClause toString TEMPORARILY DISABLED"; + return result + "}"; + } + + string ReplSetConfig::TagRule::toString() const { + string result = "{"; + for (vector<TagClause*>::const_iterator it = clauses.begin(); it < clauses.end(); it++) { + result += ((TagClause*)(*it))->toString()+","; + } + return result+"}"; + } + + void ReplSetConfig::TagSubgroup::updateLast(const OpTime& op) { + RACECHECK + if (last < op) { + last = op; + + for (vector<TagClause*>::iterator it = clauses.begin(); it < clauses.end(); it++) { + (*it)->updateLast(op); + } + } + } + + void ReplSetConfig::TagClause::updateLast(const OpTime& op) { + RACECHECK + if (last >= op) { + return; + } + + // check at least n subgroups greater than clause.last + int count = 0; + map<string,TagSubgroup*>::iterator it; + for (it = subgroups.begin(); it != subgroups.end(); it++) { + if ((*it).second->last >= op) { + count++; + } + } + + if (count >= actualTarget) { + last = op; + rule->updateLast(op); + } + } + + void ReplSetConfig::TagRule::updateLast(const OpTime& op) { + OpTime *earliest = (OpTime*)&op; + vector<TagClause*>::iterator it; + + for (it = clauses.begin(); it < clauses.end(); it++) { + if ((*it)->last < *earliest) { + earliest = &(*it)->last; + } + } + + // rules are simply and-ed clauses, so whatever the most-behind + // clause is at is what the rule is at + last = *earliest; + } + + /** @param o old config + @param n new config + */ + /*static*/ + bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) { + assert( theReplSet ); + + if( o._id != n._id ) { + errmsg = "set name may not change"; + return false; + } + /* TODO : wonder if we need to allow o.version < n.version only, which is more lenient. + if someone had some intermediate config this node doesnt have, that could be + necessary. but then how did we become primary? so perhaps we are fine as-is. + */ + if( o.version >= n.version ) { + errmsg = str::stream() << "version number must increase, old: " + << o.version << " new: " << n.version; + return false; + } + + map<HostAndPort,const ReplSetConfig::MemberCfg*> old; + bool isLocalHost = false; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { + if (i->h.isLocalHost()) { + isLocalHost = true; + } + old[i->h] = &(*i); + } + int me = 0; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { + const ReplSetConfig::MemberCfg& m = *i; + if ( (isLocalHost && !m.h.isLocalHost()) || (!isLocalHost && m.h.isLocalHost())) { + log() << "reconfig error, cannot switch between localhost and hostnames: " + << m.h.toString() << rsLog; + uasserted(13645, "hosts cannot switch between localhost and hostname"); + } + if( old.count(m.h) ) { + const ReplSetConfig::MemberCfg& oldCfg = *old[m.h]; + if( oldCfg._id != m._id ) { + log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; + uasserted(13432, "_id may not change for members"); + } + if( oldCfg.buildIndexes != m.buildIndexes ) { + log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; + uasserted(13476, "buildIndexes may not change for members"); + } + /* are transitions to and from arbiterOnly guaranteed safe? if not, we should disallow here. + there is a test at replsets/replsetarb3.js */ + if( oldCfg.arbiterOnly != m.arbiterOnly ) { + log() << "replSet reconfig error with member: " << m.h.toString() << " arbiterOnly cannot change. remove and readd the member instead " << rsLog; + uasserted(13510, "arbiterOnly may not change for members"); + } + } + if( m.h.isSelf() ) + me++; + } + + uassert(13433, "can't find self in new replset config", me == 1); + + return true; + } + + void ReplSetConfig::clear() { + version = -5; + _ok = false; + } + + void ReplSetConfig::setMajority() { + int total = members.size(); + int nonArbiters = total; + int strictMajority = total/2+1; + + for (vector<MemberCfg>::iterator it = members.begin(); it < members.end(); it++) { + if ((*it).arbiterOnly) { + nonArbiters--; + } + } + + // majority should be all "normal" members if we have something like 4 + // arbiters & 3 normal members + _majority = (strictMajority > nonArbiters) ? nonArbiters : strictMajority; + } + + int ReplSetConfig::getMajority() const { + return _majority; + } + + void ReplSetConfig::checkRsConfig() const { + uassert(13132, + str::stream() << "nonmatching repl set name in _id field: " << _id << " vs. " << cmdLine.ourSetName(), + _id == cmdLine.ourSetName()); + uassert(13308, "replSet bad config version #", version > 0); + uassert(13133, "replSet bad config no members", members.size() >= 1); + uassert(13309, "replSet bad config maximum number of members is 12", members.size() <= 12); + { + unsigned voters = 0; + for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); ++i ) { + if( i->votes ) + voters++; + } + uassert(13612, "replSet bad config maximum number of voting members is 7", voters <= 7); + uassert(13613, "replSet bad config no voting members", voters > 0); + } + } + + void ReplSetConfig::_populateTagMap(map<string,TagClause> &tagMap) { + // create subgroups for each server corresponding to each of + // its tags. E.g.: + // + // A is tagged with {"server" : "A", "dc" : "ny"} + // B is tagged with {"server" : "B", "dc" : "ny"} + // + // At the end of this step, tagMap will contain: + // + // "server" => {"A" : [A], "B" : [B]} + // "dc" => {"ny" : [A,B]} + + for (unsigned i=0; i<members.size(); i++) { + MemberCfg member = members[i]; + + for (map<string,string>::iterator tag = member.tags.begin(); tag != member.tags.end(); tag++) { + string label = (*tag).first; + string value = (*tag).second; + + TagClause& clause = tagMap[label]; + clause.name = label; + + TagSubgroup* subgroup; + // search for "ny" in "dc"'s clause + if (clause.subgroups.find(value) == clause.subgroups.end()) { + clause.subgroups[value] = subgroup = new TagSubgroup(value); + } + else { + subgroup = clause.subgroups[value]; + } + + subgroup->m.insert(&members[i]); + } + } + } + + void ReplSetConfig::parseRules(const BSONObj& modes) { + map<string,TagClause> tagMap; + _populateTagMap(tagMap); + + for (BSONObj::iterator i = modes.begin(); i.more(); ) { + unsigned int primaryOnly = 0; + + // ruleName : {dc : 2, m : 3} + BSONElement rule = i.next(); + uassert(14046, "getLastErrorMode rules must be objects", rule.type() == mongo::Object); + + TagRule* r = new TagRule(); + + BSONObj clauseObj = rule.Obj(); + for (BSONObj::iterator c = clauseObj.begin(); c.more(); ) { + BSONElement clauseElem = c.next(); + uassert(14829, "getLastErrorMode criteria must be numeric", clauseElem.isNumber()); + + // get the clause, e.g., "x.y" : 3 + const char *criteria = clauseElem.fieldName(); + int value = clauseElem.numberInt(); + uassert(14828, str::stream() << "getLastErrorMode criteria must be greater than 0: " << clauseElem, value > 0); + + TagClause* node = new TagClause(tagMap[criteria]); + + int numGroups = node->subgroups.size(); + uassert(14831, str::stream() << "mode " << clauseObj << " requires " + << value << " tagged with " << criteria << ", but only " + << numGroups << " with this tag were found", numGroups >= value); + + node->name = criteria; + node->target = value; + // if any subgroups contain "me", we can decrease the target + node->actualTarget = node->target; + + // then we want to add pointers between clause & subgroup + for (map<string,TagSubgroup*>::iterator sgs = node->subgroups.begin(); + sgs != node->subgroups.end(); sgs++) { + bool foundMe = false; + (*sgs).second->clauses.push_back(node); + + // if this subgroup contains the primary, it's automatically always up-to-date + for( set<MemberCfg*>::const_iterator cfg = (*sgs).second->m.begin(); + cfg != (*sgs).second->m.end(); + cfg++) + { + if ((*cfg)->h.isSelf()) { + node->actualTarget--; + foundMe = true; + } + } + + for (set<MemberCfg *>::iterator cfg = (*sgs).second->m.begin(); + !foundMe && cfg != (*sgs).second->m.end(); cfg++) { + (*cfg)->groupsw().insert((*sgs).second); + } + } + + // if all of the members of this clause involve the primary, it's always up-to-date + if (node->actualTarget == 0) { + node->last = OpTime(INT_MAX, INT_MAX); + primaryOnly++; + } + + // this is a valid clause, so we want to add it to its rule + node->rule = r; + r->clauses.push_back(node); + } + + // if all of the clauses are satisfied by the primary, this rule is trivially true + if (primaryOnly == r->clauses.size()) { + r->last = OpTime(INT_MAX, INT_MAX); + } + + // if we got here, this is a valid rule + LOG(1) << "replSet new rule " << rule.fieldName() << ": " << r->toString() << rsLog; + rules[rule.fieldName()] = r; + } + } + + void ReplSetConfig::from(BSONObj o) { + static const string legal[] = {"_id","version", "members","settings"}; + static const set<string> legals(legal, legal + 4); + assertOnlyHas(o, legals); + + md5 = o.md5(); + _id = o["_id"].String(); + if( o["version"].ok() ) { + version = o["version"].numberInt(); + uassert(13115, "bad " + rsConfigNs + " config: version", version > 0); + } + + set<string> hosts; + set<int> ords; + vector<BSONElement> members; + try { + members = o["members"].Array(); + } + catch(...) { + uasserted(13131, "replSet error parsing (or missing) 'members' field in config object"); + } + + unsigned localhosts = 0; + for( unsigned i = 0; i < members.size(); i++ ) { + BSONObj mobj = members[i].Obj(); + MemberCfg m; + try { + static const string legal[] = { + "_id","votes","priority","host", "hidden","slaveDelay", + "arbiterOnly","buildIndexes","tags","initialSync" // deprecated + }; + static const set<string> legals(legal, legal + 10); + assertOnlyHas(mobj, legals); + + try { + m._id = (int) mobj["_id"].Number(); + } + catch(...) { + /* TODO: use of string exceptions may be problematic for reconfig case! */ + throw "_id must be numeric"; + } + try { + string s = mobj["host"].String(); + boost::trim(s); + m.h = HostAndPort(s); + if ( !m.h.hasPort() ) { + // make port explicit even if default + m.h.setPort(m.h.port()); + } + } + catch(...) { + throw string("bad or missing host field? ") + mobj.toString(); + } + if( m.h.isLocalHost() ) + localhosts++; + m.arbiterOnly = mobj["arbiterOnly"].trueValue(); + m.slaveDelay = mobj["slaveDelay"].numberInt(); + if( mobj.hasElement("hidden") ) + m.hidden = mobj["hidden"].trueValue(); + if( mobj.hasElement("buildIndexes") ) + m.buildIndexes = mobj["buildIndexes"].trueValue(); + if( mobj.hasElement("priority") ) + m.priority = mobj["priority"].Number(); + if( mobj.hasElement("votes") ) + m.votes = (unsigned) mobj["votes"].Number(); + if( mobj.hasElement("tags") ) { + const BSONObj &t = mobj["tags"].Obj(); + for (BSONObj::iterator c = t.begin(); c.more(); c.next()) { + m.tags[(*c).fieldName()] = (*c).String(); + } + uassert(14827, "arbiters cannot have tags", !m.arbiterOnly || m.tags.empty() ); + } + m.check(); + } + catch( const char * p ) { + log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog; + stringstream ss; + ss << "replSet members[" << i << "] " << p; + uassert(13107, ss.str(), false); + } + catch(DBException& e) { + log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog; + stringstream ss; + ss << "bad config for member[" << i << "] " << e.what(); + uassert(13135, ss.str(), false); + } + if( !(ords.count(m._id) == 0 && hosts.count(m.h.toString()) == 0) ) { + log() << "replSet " << o.toString() << rsLog; + uassert(13108, "bad replset config -- duplicate hosts in the config object?", false); + } + hosts.insert(m.h.dynString()); + ords.insert(m._id); + this->members.push_back(m); + } + uassert(13393, "can't use localhost in repl set member names except when using it for all members", localhosts == 0 || localhosts == members.size()); + uassert(13117, "bad " + rsConfigNs + " config", !_id.empty()); + + if( o["settings"].ok() ) { + BSONObj settings = o["settings"].Obj(); + if( settings["getLastErrorModes"].ok() ) { + parseRules(settings["getLastErrorModes"].Obj()); + } + ho.check(); + try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } + catch(...) { } + } + + // figure out the majority for this config + setMajority(); + } + + static inline void configAssert(bool expr) { + uassert(13122, "bad repl set config?", expr); + } + + ReplSetConfig::ReplSetConfig(BSONObj cfg, bool force) { + _constructed = false; + clear(); + from(cfg); + if( force ) { + version += rand() % 100000 + 10000; + } + configAssert( version < 0 /*unspecified*/ || (version >= 1) ); + if( version < 1 ) + version = 1; + _ok = true; + _constructed = true; + } + + ReplSetConfig::ReplSetConfig(const HostAndPort& h) { + LOG(2) << "ReplSetConfig load " << h.toStringLong() << rsLog; + + _constructed = false; + clear(); + int level = 2; + DEV level = 0; + + BSONObj cfg; + int v = -5; + try { + if( h.isSelf() ) { + ; + } + else { + /* first, make sure other node is configured to be a replset. just to be safe. */ + string setname = cmdLine.ourSetName(); + BSONObj cmd = BSON( "replSetHeartbeat" << setname ); + int theirVersion; + BSONObj info; + log() << "trying to contact " << h.toString() << rsLog; + bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion); + if( info["rs"].trueValue() ) { + // yes, it is a replicate set, although perhaps not yet initialized + } + else { + if( !ok ) { + log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog; + if( !info.isEmpty() ) + log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog; + return; + } + { + stringstream ss; + ss << "replSet error: member " << h.toString() << " is not in --replSet mode"; + msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught + //for python err# checker: uassert(13260, "", false); + } + } + } + + v = -4; + unsigned long long count = 0; + try { + ScopedConn conn(h.toString()); + v = -3; + cfg = conn.findOne(rsConfigNs, Query()).getOwned(); + count = conn.count(rsConfigNs); + } + catch ( DBException& ) { + if ( !h.isSelf() ) { + throw; + } + + // on startup, socket is not listening yet + DBDirectClient cli; + cfg = cli.findOne( rsConfigNs, Query() ).getOwned(); + count = cli.count(rsConfigNs); + } + + if( count > 1 ) + uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString()); + + if( cfg.isEmpty() ) { + version = EMPTYCONFIG; + return; + } + version = -1; + } + catch( DBException& e) { + version = v; + log(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog; + return; + } + + from(cfg); + checkRsConfig(); + _ok = true; + log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog; + _constructed = true; + } + +} diff --git a/src/mongo/db/repl/rs_config.h b/src/mongo/db/repl/rs_config.h new file mode 100644 index 00000000000..cfe2e86a568 --- /dev/null +++ b/src/mongo/db/repl/rs_config.h @@ -0,0 +1,251 @@ +// rs_config.h +// repl set configuration +// + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../../util/net/hostandport.h" +#include "../../util/concurrency/race.h" +#include "health.h" + +namespace mongo { + class Member; + const string rsConfigNs = "local.system.replset"; + + class ReplSetConfig { + enum { EMPTYCONFIG = -2 }; + struct TagSubgroup; + public: + /** + * This contacts the given host and tries to get a config from them. + * + * This sends a test heartbeat to the host and, if all goes well and the + * host has a more recent config, fetches the config and loads it (see + * from(). + * + * If it's contacting itself, it skips the heartbeat (for obvious + * reasons.) If something is misconfigured, throws an exception. If the + * host couldn't be queried or is just blank, ok() will be false. + */ + ReplSetConfig(const HostAndPort& h); + + ReplSetConfig(BSONObj cfg, bool force=false); + + bool ok() const { return _ok; } + + struct TagRule; + + struct MemberCfg { + MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false), buildIndexes(true) { } + int _id; /* ordinal */ + unsigned votes; /* how many votes this node gets. default 1. */ + HostAndPort h; + double priority; /* 0 means can never be primary */ + bool arbiterOnly; + int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */ + bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */ + bool buildIndexes; /* if false, do not create any non-_id indexes */ + map<string,string> tags; /* tagging for data center, rack, etc. */ + private: + set<TagSubgroup*> _groups; // the subgroups this member belongs to + public: + const set<TagSubgroup*>& groups() const { + return _groups; + } + set<TagSubgroup*>& groupsw() { + return _groups; + } + void check() const; /* check validity, assert if not. */ + BSONObj asBson() const; + bool potentiallyHot() const { return !arbiterOnly && priority > 0; } + void updateGroups(const OpTime& last) { + RACECHECK + for (set<TagSubgroup*>::const_iterator it = groups().begin(); it != groups().end(); it++) { + ((TagSubgroup*)(*it))->updateLast(last); + } + } + bool operator==(const MemberCfg& r) const { + if (!tags.empty() || !r.tags.empty()) { + if (tags.size() != r.tags.size()) { + return false; + } + + // if they are the same size and not equal, at least one + // element in A must be different in B + for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) { + map<string,string>::const_iterator rit = r.tags.find((*lit).first); + + if (rit == r.tags.end() || (*lit).second != (*rit).second) { + return false; + } + } + } + + return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && + arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden && + buildIndexes == buildIndexes; + } + bool operator!=(const MemberCfg& r) const { return !(*this == r); } + }; + + vector<MemberCfg> members; + string _id; + int version; + HealthOptions ho; + string md5; + BSONObj getLastErrorDefaults; + map<string,TagRule*> rules; + + list<HostAndPort> otherMemberHostnames() const; // except self + + /** @return true if could connect, and there is no cfg object there at all */ + bool empty() const { return version == EMPTYCONFIG; } + + string toString() const { return asBson().toString(); } + + /** validate the settings. does not call check() on each member, you have to do that separately. */ + void checkRsConfig() const; + + /** check if modification makes sense */ + static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg); + + //static void receivedNewConfig(BSONObj); + void saveConfigLocally(BSONObj comment); // to local db + string saveConfigEverywhere(); // returns textual info on what happened + + /** + * Update members' groups when the config changes but members stay the same. + */ + void updateMembers(List1<Member> &dest); + + BSONObj asBson() const; + + /** + * Getter and setter for _majority. This is almost always + * members.size()/2+1, but can be the number of non-arbiter members if + * there are more arbiters than non-arbiters (writing to 3 out of 7 + * servers is safe if 4 of the servers are arbiters). + */ + void setMajority(); + int getMajority() const; + + bool _constructed; + private: + bool _ok; + int _majority; + + void from(BSONObj); + void clear(); + + struct TagClause; + + /** + * This is a logical grouping of servers. It is pointed to by a set of + * servers with a certain tag. + * + * For example, suppose servers A, B, and C have the tag "dc" : "nyc". If we + * have a rule {"dc" : 2}, then we want A _or_ B _or_ C to have the + * write for one of the "dc" critiria to be fulfilled, so all three will + * point to this subgroup. When one of their oplog-tailing cursors is + * updated, this subgroup is updated. + */ + struct TagSubgroup : boost::noncopyable { + ~TagSubgroup(); // never called; not defined + TagSubgroup(string nm) : name(nm) { } + const string name; + OpTime last; + vector<TagClause*> clauses; + + // this probably won't actually point to valid members after the + // subgroup is created, as initFromConfig() makes a copy of the + // config + set<MemberCfg*> m; + + void updateLast(const OpTime& op); + + //string toString() const; + + /** + * If two tags have the same name, they should compare as equal so + * that members don't have to update two identical groups on writes. + */ + bool operator() (TagSubgroup& lhs, TagSubgroup& rhs) const { + return lhs.name < rhs.name; + } + }; + + /** + * An argument in a rule. For example, if we had the rule {dc : 2, + * machines : 3}, "dc" : 2 and "machines" : 3 would be two TagClauses. + * + * Each tag clause has a set of associated subgroups. For example, if + * we had "dc" : 2, our subgroups might be "nyc", "sf", and "hk". + */ + struct TagClause { + OpTime last; + map<string,TagSubgroup*> subgroups; + TagRule *rule; + string name; + /** + * If we have get a clause like {machines : 3} and this server is + * tagged with "machines", then it's really {machines : 2}, as we + * will always be up-to-date. So, target would be 3 and + * actualTarget would be 2, in that example. + */ + int target; + int actualTarget; + + void updateLast(const OpTime& op); + string toString() const; + }; + + /** + * Parses getLastErrorModes. + */ + void parseRules(const BSONObj& modes); + + /** + * Create a hash containing every possible clause that could be used in a + * rule and the servers related to that clause. + * + * For example, suppose we have the following servers: + * A {"dc" : "ny", "ny" : "rk1"} + * B {"dc" : "ny", "ny" : "rk1"} + * C {"dc" : "ny", "ny" : "rk2"} + * D {"dc" : "sf", "sf" : "rk1"} + * E {"dc" : "sf", "sf" : "rk2"} + * + * This would give us the possible criteria: + * "dc" -> {A, B, C},{D, E} + * "ny" -> {A, B},{C} + * "sf" -> {D},{E} + */ + void _populateTagMap(map<string,TagClause> &tagMap); + + public: + struct TagRule { + vector<TagClause*> clauses; + OpTime last; + + void updateLast(const OpTime& op); + string toString() const; + }; + }; + +} diff --git a/src/mongo/db/repl/rs_exception.h b/src/mongo/db/repl/rs_exception.h new file mode 100644 index 00000000000..fc372fc241c --- /dev/null +++ b/src/mongo/db/repl/rs_exception.h @@ -0,0 +1,17 @@ +// @file rs_exception.h + +#pragma once + +namespace mongo { + + class VoteException : public std::exception { + public: + const char * what() const throw () { return "VoteException"; } + }; + + class RetryAfterSleepException : public std::exception { + public: + const char * what() const throw () { return "RetryAfterSleepException"; } + }; + +} diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp new file mode 100644 index 00000000000..b67c0d71b83 --- /dev/null +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -0,0 +1,271 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../repl.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../oplogreader.h" +#include "../../util/mongoutils/str.h" +#include "../dbhelpers.h" +#include "rs_optime.h" +#include "../oplog.h" + +namespace mongo { + + using namespace mongoutils; + using namespace bson; + + void dropAllDatabasesExceptLocal(); + + // add try/catch with sleep + + void isyncassert(const string& msg, bool expr) { + if( !expr ) { + string m = str::stream() << "initial sync " << msg; + theReplSet->sethbmsg(m, 0); + uasserted(13404, m); + } + } + + void ReplSetImpl::syncDoInitialSync() { + createOplog(); + + while( 1 ) { + try { + _syncDoInitialSync(); + break; + } + catch(DBException& e) { + sethbmsg("initial sync exception " + e.toString(), 0); + sleepsecs(30); + } + } + } + + /* todo : progress metering to sethbmsg. */ + static bool clone(const char *master, string db) { + string err; + return cloneFrom(master, err, db, false, + /* slave_ok */ true, true, false, /*mayYield*/true, /*mayBeInterrupted*/false); + } + + void _logOpObjRS(const BSONObj& op); + + static void emptyOplog() { + writelock lk(rsoplog); + Client::Context ctx(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); + + // temp + if( d && d->stats.nrecords == 0 ) + return; // already empty, ok. + + LOG(1) << "replSet empty oplog" << rsLog; + d->emptyCappedCollection(rsoplog); + } + + Member* ReplSetImpl::getMemberToSyncTo() { + Member *closest = 0; + time_t now = 0; + bool buildIndexes = true; + + // wait for 2N pings before choosing a sync target + if (_cfg) { + int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; + + if (needMorePings > 0) { + OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; + return NULL; + } + + buildIndexes = myConfig().buildIndexes; + } + + // find the member with the lowest ping time that has more data than me + for (Member *m = _members.head(); m; m = m->next()) { + if (m->hbinfo().up() && + // make sure members with buildIndexes sync from other members w/indexes + (!buildIndexes || (buildIndexes && m->config().buildIndexes)) && + (m->state() == MemberState::RS_PRIMARY || + (m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) && + (!closest || m->hbinfo().ping < closest->hbinfo().ping)) { + + map<string,time_t>::iterator vetoed = _veto.find(m->fullName()); + if (vetoed == _veto.end()) { + closest = m; + break; + } + + if (now == 0) { + now = time(0); + } + + // if this was on the veto list, check if it was vetoed in the last "while" + if ((*vetoed).second < now) { + _veto.erase(vetoed); + closest = m; + break; + } + + // if it was recently vetoed, skip + log() << "replSet not trying to sync from " << (*vetoed).first + << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog; + } + } + + { + lock lk(this); + + if (!closest) { + _currentSyncTarget = NULL; + return NULL; + } + + _currentSyncTarget = closest; + } + + sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); + + return closest; + } + + void ReplSetImpl::veto(const string& host, const unsigned secs) { + _veto[host] = time(0)+secs; + } + + /** + * Do the initial sync for this member. + */ + void ReplSetImpl::_syncDoInitialSync() { + sethbmsg("initial sync pending",0); + + // if this is the first node, it may have already become primary + if ( box.getState().primary() ) { + sethbmsg("I'm already primary, no need for initial sync",0); + return; + } + + const Member *source = getMemberToSyncTo(); + if (!source) { + sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); + sleepsecs(15); + return; + } + + string sourceHostname = source->h().toString(); + OplogReader r; + if( !r.connect(sourceHostname) ) { + sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); + sleepsecs(15); + return; + } + + BSONObj lastOp = r.getLastOp(rsoplog); + if( lastOp.isEmpty() ) { + sethbmsg("initial sync couldn't read remote oplog", 0); + sleepsecs(15); + return; + } + OpTime startingTS = lastOp["ts"]._opTime(); + + if (replSettings.fastsync) { + log() << "fastsync: skipping database clone" << rsLog; + } + else { + sethbmsg("initial sync drop all databases", 0); + dropAllDatabasesExceptLocal(); + + sethbmsg("initial sync clone all databases", 0); + + list<string> dbs = r.conn()->getDatabaseNames(); + for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { + string db = *i; + if( db != "local" ) { + sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + bool ok; + { + writelock lk(db); + Client::Context ctx(db); + ok = clone(sourceHostname.c_str(), db); + } + if( !ok ) { + sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); + veto(source->fullName(), 600); + sleepsecs(300); + return; + } + } + } + } + + sethbmsg("initial sync query minValid",0); + + /* our cloned copy will be strange until we apply oplog events that occurred + through the process. we note that time point here. */ + BSONObj minValid = r.getLastOp(rsoplog); + isyncassert( "getLastOp is empty ", !minValid.isEmpty() ); + OpTime mvoptime = minValid["ts"]._opTime(); + assert( !mvoptime.isNull() ); + assert( mvoptime >= startingTS ); + + // apply startingTS..mvoptime portion of the oplog + { + // note we assume here that this call does not throw + if( ! initialSyncOplogApplication(startingTS, mvoptime) ) { + log() << "replSet initial sync failed during oplog application phase" << rsLog; + + emptyOplog(); // otherwise we'll be up! + + lastOpTimeWritten = OpTime(); + lastH = 0; + + log() << "replSet cleaning up [1]" << rsLog; + { + writelock lk("local."); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } + log() << "replSet cleaning up [2]" << rsLog; + + log() << "replSet initial sync failed will try again" << endl; + + sleepsecs(5); + return; + } + } + + sethbmsg("initial sync finishing up",0); + + assert( !box.getState().primary() ); // wouldn't make sense if we were. + + { + writelock lk("local."); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + try { + log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog; + } + catch(...) { } + Helpers::putSingleton("local.replset.minvalid", minValid); + cx.db()->flushFiles(true); + } + + sethbmsg("initial sync done",0); + } + +} diff --git a/src/mongo/db/repl/rs_initiate.cpp b/src/mongo/db/repl/rs_initiate.cpp new file mode 100644 index 00000000000..77bc6c03938 --- /dev/null +++ b/src/mongo/db/repl/rs_initiate.cpp @@ -0,0 +1,269 @@ +/* @file rs_initiate.cpp + */ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../cmdline.h" +#include "../commands.h" +#include "../../util/mmap.h" +#include "../../util/mongoutils/str.h" +#include "health.h" +#include "rs.h" +#include "rs_config.h" +#include "../dbhelpers.h" +#include "../oplog.h" + +using namespace bson; +using namespace mongoutils; + +namespace mongo { + + /* called on a reconfig AND on initiate + throws + @param initial true when initiating + */ + void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial) { + int failures = 0, allVotes = 0, allowableFailures = 0; + int me = 0; + stringstream selfs; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { + if( i->h.isSelf() ) { + me++; + if( me > 1 ) + selfs << ','; + selfs << i->h.toString(); + if( !i->potentiallyHot() ) { + uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary"); + } + } + allVotes += i->votes; + } + allowableFailures = allVotes - (allVotes/2 + 1); + + uassert(13278, "bad config: isSelf is true for multiple hosts: " + selfs.str(), me <= 1); // dups? + if( me != 1 ) { + stringstream ss; + ss << "can't find self in the replset config"; + if( !cmdLine.isDefaultPort() ) ss << " my port: " << cmdLine.port; + if( me != 0 ) ss << " found: " << me; + uasserted(13279, ss.str()); + } + + vector<string> down; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { + // we know we're up + if (i->h.isSelf()) { + continue; + } + + BSONObj res; + { + bool ok = false; + try { + int theirVersion = -1000; + ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/); + if( theirVersion >= cfg.version ) { + stringstream ss; + ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure"; + uasserted(13259, ss.str()); + } + } + catch(DBException& e) { + log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog; + } + catch(...) { + log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog; + } + if( res.getBoolField("mismatch") ) + uasserted(13145, "set name does not match the set name host " + i->h.toString() + " expects"); + if( *res.getStringField("set") ) { + if( cfg.version <= 1 ) { + // this was to be initiation, no one shoudl be initiated already. + uasserted(13256, "member " + i->h.toString() + " is already initiated"); + } + else { + // Assure no one has a newer config. + if( res["v"].Int() >= cfg.version ) { + uasserted(13341, "member " + i->h.toString() + " has a config version >= to the new cfg version; cannot change config"); + } + } + } + if( !ok && !res["rs"].trueValue() ) { + down.push_back(i->h.toString()); + + if( !res.isEmpty() ) { + /* strange. got a response, but not "ok". log it. */ + log() << "replSet warning " << i->h.toString() << " replied: " << res.toString() << rsLog; + } + + bool allowFailure = false; + failures += i->votes; + if( !initial && failures <= allowableFailures ) { + const Member* m = theReplSet->findById( i->_id ); + if( m ) { + assert( m->h().toString() == i->h.toString() ); + } + // it's okay if the down member isn't part of the config, + // we might be adding a new member that isn't up yet + allowFailure = true; + } + + if( !allowFailure ) { + string msg = string("need all members up to initiate, not ok : ") + i->h.toStringLong(); + if( !initial ) + msg = string("need most members up to reconfigure, not ok : ") + i->h.toString(); + uasserted(13144, msg); + } + } + } + if( initial ) { + bool hasData = res["hasData"].Bool(); + uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.", + !hasData || i->h.isSelf()); + } + } + if (down.size() > 0) { + result.append("down", down); + } + } + + class CmdReplSetInitiate : public ReplSetCommand { + public: + virtual LockType locktype() const { return NONE; } + CmdReplSetInitiate() : ReplSetCommand("replSetInitiate") { } + virtual void help(stringstream& h) const { + h << "Initiate/christen a replica set."; + h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + log() << "replSet replSetInitiate admin command received from client" << rsLog; + + if( !replSet ) { + errmsg = "server is not running with --replSet"; + return false; + } + if( theReplSet ) { + errmsg = "already initialized"; + result.append("info", "try querying " + rsConfigNs + " to see current configuration"); + return false; + } + + { + // just make sure we can get a write lock before doing anything else. we'll reacquire one + // later. of course it could be stuck then, but this check lowers the risk if weird things + // are up. + time_t t = time(0); + writelock lk(""); + if( time(0)-t > 10 ) { + errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; + return false; + } + + /* check that we don't already have an oplog. that could cause issues. + it is ok if the initiating member has *other* data than that. + */ + BSONObj o; + if( Helpers::getFirst(rsoplog, o) ) { + errmsg = rsoplog + string(" is not empty on the initiating member. cannot initiate."); + return false; + } + } + + if( ReplSet::startupStatus == ReplSet::BADCONFIG ) { + errmsg = "server already in BADCONFIG state (check logs); not initiating"; + result.append("info", ReplSet::startupStatusMsg.get()); + return false; + } + if( ReplSet::startupStatus != ReplSet::EMPTYCONFIG ) { + result.append("startupStatus", ReplSet::startupStatus); + errmsg = "all members and seeds must be reachable to initiate set"; + result.append("info", cmdLine._replSet); + return false; + } + + BSONObj configObj; + + if( cmdObj["replSetInitiate"].type() != Object ) { + result.append("info2", "no configuration explicitly specified -- making one"); + log() << "replSet info initiate : no configuration specified. Using a default configuration for the set" << rsLog; + + string name; + vector<HostAndPort> seeds; + set<HostAndPort> seedSet; + parseReplsetCmdLine(cmdLine._replSet, name, seeds, seedSet); // may throw... + + bob b; + b.append("_id", name); + bob members; + members.append("0", BSON( "_id" << 0 << "host" << HostAndPort::Me().dynString() )); + result.append("me", HostAndPort::Me().toString()); + for( unsigned i = 0; i < seeds.size(); i++ ) + members.append(bob::numStr(i+1), BSON( "_id" << i+1 << "host" << seeds[i].toString())); + b.appendArray("members", members.obj()); + configObj = b.obj(); + log() << "replSet created this configuration for initiation : " << configObj.toString() << rsLog; + } + else { + configObj = cmdObj["replSetInitiate"].Obj(); + } + + bool parsed = false; + try { + ReplSetConfig newConfig(configObj); + parsed = true; + + if( newConfig.version > 1 ) { + errmsg = "can't initiate with a version number greater than 1"; + return false; + } + + log() << "replSet replSetInitiate config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; + + checkMembersUpForConfigChange(newConfig, result, true); + + log() << "replSet replSetInitiate all members seem up" << rsLog; + + createOplog(); + + writelock lk(""); + bo comment = BSON( "msg" << "initiating set"); + newConfig.saveConfigLocally(comment); + log() << "replSet replSetInitiate config now saved locally. Should come online in about a minute." << rsLog; + result.append("info", "Config now saved locally. Should come online in about a minute."); + ReplSet::startupStatus = ReplSet::SOON; + ReplSet::startupStatusMsg.set("Received replSetInitiate - should come online shortly."); + } + catch( DBException& e ) { + log() << "replSet replSetInitiate exception: " << e.what() << rsLog; + if( !parsed ) + errmsg = string("couldn't parse cfg object ") + e.what(); + else + errmsg = string("couldn't initiate : ") + e.what(); + return false; + } + catch( string& e2 ) { + log() << e2 << rsLog; + errmsg = e2; + return false; + } + + return true; + } + } cmdReplSetInitiate; + +} diff --git a/src/mongo/db/repl/rs_member.h b/src/mongo/db/repl/rs_member.h new file mode 100644 index 00000000000..24e593392b6 --- /dev/null +++ b/src/mongo/db/repl/rs_member.h @@ -0,0 +1,131 @@ +// @file rsmember.h +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** replica set member */ + +#pragma once + +#include "../../util/concurrency/value.h" + +namespace mongo { + + + /* + RS_STARTUP serving still starting up, or still trying to initiate the set + RS_PRIMARY this server thinks it is primary + RS_SECONDARY this server thinks it is a secondary (slave mode) + RS_RECOVERING recovering/resyncing; after recovery usually auto-transitions to secondary + RS_FATAL something bad has occurred and server is not completely offline with regard to the replica set. fatal error. + RS_STARTUP2 loaded config, still determining who is primary + */ + struct MemberState { + enum MS { + RS_STARTUP = 0, + RS_PRIMARY = 1, + RS_SECONDARY = 2, + RS_RECOVERING = 3, + RS_FATAL = 4, + RS_STARTUP2 = 5, + RS_UNKNOWN = 6, /* remote node not yet reached */ + RS_ARBITER = 7, + RS_DOWN = 8, /* node not reachable for a report */ + RS_ROLLBACK = 9 + } s; + + MemberState(MS ms = RS_UNKNOWN) : s(ms) { } + explicit MemberState(int ms) : s((MS) ms) { } + + bool startup() const { return s == RS_STARTUP; } + bool primary() const { return s == RS_PRIMARY; } + bool secondary() const { return s == RS_SECONDARY; } + bool recovering() const { return s == RS_RECOVERING; } + bool startup2() const { return s == RS_STARTUP2; } + bool fatal() const { return s == RS_FATAL; } + bool rollback() const { return s == RS_ROLLBACK; } + bool readable() const { return s == RS_PRIMARY || s == RS_SECONDARY; } + + string toString() const; + + bool operator==(const MemberState& r) const { return s == r.s; } + bool operator!=(const MemberState& r) const { return s != r.s; } + }; + + /* this is supposed to be just basic information on a member, + and copy constructable. */ + class HeartbeatInfo { + unsigned _id; + public: + HeartbeatInfo() : _id(0xffffffff), hbstate(MemberState::RS_UNKNOWN), health(-1.0), + downSince(0), skew(INT_MIN), authIssue(false), ping(0) { } + HeartbeatInfo(unsigned id); + unsigned id() const { return _id; } + MemberState hbstate; + double health; + time_t upSince; + long long downSince; + time_t lastHeartbeat; + DiagStr lastHeartbeatMsg; + OpTime opTime; + int skew; + bool authIssue; + unsigned int ping; // milliseconds + static unsigned int numPings; + + bool up() const { return health > 0; } + + /** health is set to -1 on startup. that means we haven't even checked yet. 0 means we checked and it failed. */ + bool maybeUp() const { return health != 0; } + + long long timeDown() const; // ms + + /* true if changed in a way of interest to the repl set manager. */ + bool changed(const HeartbeatInfo& old) const; + }; + + inline HeartbeatInfo::HeartbeatInfo(unsigned id) : + _id(id), + authIssue(false), + ping(0) { + hbstate = MemberState::RS_UNKNOWN; + health = -1.0; + downSince = 0; + lastHeartbeat = upSince = 0; + skew = INT_MIN; + } + + inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { + return health != old.health || + hbstate != old.hbstate; + } + + inline string MemberState::toString() const { + switch ( s ) { + case RS_STARTUP: return "STARTUP"; + case RS_PRIMARY: return "PRIMARY"; + case RS_SECONDARY: return "SECONDARY"; + case RS_RECOVERING: return "RECOVERING"; + case RS_FATAL: return "FATAL"; + case RS_STARTUP2: return "STARTUP2"; + case RS_ARBITER: return "ARBITER"; + case RS_DOWN: return "DOWN"; + case RS_ROLLBACK: return "ROLLBACK"; + case RS_UNKNOWN: return "UNKNOWN"; + } + return ""; + } + +} diff --git a/src/mongo/db/repl/rs_optime.h b/src/mongo/db/repl/rs_optime.h new file mode 100644 index 00000000000..f0ca56927ad --- /dev/null +++ b/src/mongo/db/repl/rs_optime.h @@ -0,0 +1,58 @@ +// @file rs_optime.h + +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "../../util/optime.h" + +namespace mongo { + + const char rsoplog[] = "local.oplog.rs"; + + /* + class RSOpTime : public OpTime { + public: + bool initiated() const { return getSecs() != 0; } + };*/ + + /*struct RSOpTime { + unsigned long long ord; + + RSOpTime() : ord(0) { } + + bool initiated() const { return ord > 0; } + + void initiate() { + assert( !initiated() ); + ord = 1000000; + } + + ReplTime inc() { + DEV assertInWriteLock(); + return ++ord; + } + + string toString() const { return str::stream() << ord; } + + // query the oplog and set the highest value herein. acquires a db read lock. throws. + void load(); + }; + + extern RSOpTime rsOpTime;*/ + +} diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp new file mode 100644 index 00000000000..10727c59669 --- /dev/null +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -0,0 +1,667 @@ +/* @file rs_rollback.cpp +* +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../repl.h" +#include "../ops/query.h" +#include "../cloner.h" +#include "../ops/update.h" +#include "../ops/delete.h" + +/* Scenarios + + We went offline with ops not replicated out. + + F = node that failed and coming back. + P = node that took over, new primary + + #1: + F : a b c d e f g + P : a b c d q + + The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P + will have significantly more data. Also note that P may have a proper subset of F's stream if there were + no subsequent writes. + + For now the model is simply : get F back in sync with P. If P was really behind or something, we should have + just chosen not to fail over anyway. + + #2: + F : a b c d e f g -> a b c d + P : a b c d + + #3: + F : a b c d e f g -> a b c d q r s t u v w x z + P : a b c d.q r s t u v w x z + + Steps + find an event in common. 'd'. + undo our events beyond that by: + (1) taking copy from other server of those objects + (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object + -- i.e., reset minvalid. + (3) we could skip operations on objects that are previous in time to our capture of the object as an optimization. + +*/ + +namespace mongo { + + using namespace bson; + + void incRBID(); + + class rsfatal : public std::exception { + public: + virtual const char* what() const throw() { return "replica set fatal exception"; } + }; + + struct DocID { + const char *ns; + be _id; + bool operator<(const DocID& d) const { + int c = strcmp(ns, d.ns); + if( c < 0 ) return true; + if( c > 0 ) return false; + return _id < d._id; + } + }; + + struct HowToFixUp { + /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only + need to refetch it once. */ + set<DocID> toRefetch; + + /* collections to drop */ + set<string> toDrop; + + set<string> collectionsToResync; + + OpTime commonPoint; + DiskLoc commonPointOurDiskloc; + + int rbid; // remote server's current rollback sequence # + }; + + static void refetch(HowToFixUp& h, const BSONObj& ourObj) { + const char *op = ourObj.getStringField("op"); + if( *op == 'n' ) + return; + + unsigned long long totSize = 0; + totSize += ourObj.objsize(); + if( totSize > 512 * 1024 * 1024 ) + throw "rollback too large"; + + DocID d; + // NOTE The assigned ns value may become invalid if we yield. + d.ns = ourObj.getStringField("ns"); + if( *d.ns == 0 ) { + log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog; + return; + } + + bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o"); + if( o.isEmpty() ) { + log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog; + return; + } + + if( *op == 'c' ) { + be first = o.firstElement(); + NamespaceString s(d.ns); // foo.$cmd + string cmdname = first.fieldName(); + Command *cmd = Command::findCommand(cmdname.c_str()); + if( cmd == 0 ) { + log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog; + return; + } + else { + /* findandmodify - tranlated? + godinsert?, + renamecollection a->b. just resync a & b + */ + if( cmdname == "create" ) { + /* Create collection operation + { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } + */ + string ns = s.db + '.' + o["create"].String(); // -> foo.abc + h.toDrop.insert(ns); + return; + } + else if( cmdname == "drop" ) { + string ns = s.db + '.' + first.valuestr(); + h.collectionsToResync.insert(ns); + return; + } + else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { + /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */ + log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog; + string ns = s.db + '.' + first.valuestr(); + h.collectionsToResync.insert(ns); + return; + } + else if( cmdname == "renameCollection" ) { + /* TODO: slow. */ + log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog; + string from = first.valuestr(); + string to = o["to"].String(); + h.collectionsToResync.insert(from); + h.collectionsToResync.insert(to); + return; + } + else if( cmdname == "reIndex" ) { + return; + } + else if( cmdname == "dropDatabase" ) { + log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog; + log() << "replSet " << o.toString() << rsLog; + throw rsfatal(); + } + else { + log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog; + log() << "replSet cmdname=" << cmdname << rsLog; + throw rsfatal(); + } + } + } + + d._id = o["_id"]; + if( d._id.eoo() ) { + log() << "replSet WARNING ignoring op on rollback no _id TODO : " << d.ns << ' '<< ourObj.toString() << rsLog; + return; + } + + h.toRefetch.insert(d); + } + + int getRBID(DBClientConnection*); + + static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { + static time_t last; + if( time(0)-last < 60 ) { + throw "findcommonpoint waiting a while before trying again"; + } + last = time(0); + + assert( d.dbMutex.atLeastReadLocked() ); + Client::Context c(rsoplog); + NamespaceDetails *nsd = nsdetails(rsoplog); + assert(nsd); + ReverseCappedCursor u(nsd); + if( !u.ok() ) + throw "our oplog empty or unreadable"; + + const Query q = Query().sort(reverseNaturalObj); + const bo fields = BSON( "ts" << 1 << "h" << 1 ); + + //auto_ptr<DBClientCursor> u = us->query(rsoplog, q, 0, 0, &fields, 0, 0); + + h.rbid = getRBID(them); + auto_ptr<DBClientCursor> t = them->query(rsoplog, q, 0, 0, &fields, 0, 0); + + if( t.get() == 0 || !t->more() ) throw "remote oplog empty or unreadable"; + + BSONObj ourObj = u.current(); + OpTime ourTime = ourObj["ts"]._opTime(); + BSONObj theirObj = t->nextSafe(); + OpTime theirTime = theirObj["ts"]._opTime(); + + { + long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs()); + /* diff could be positive, negative, or zero */ + log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog; + log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog; + log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog; + if( diff > 1800 ) { + log() << "replSet rollback too long a time period for a rollback." << rsLog; + throw "error not willing to roll back more than 30 minutes of data"; + } + } + + unsigned long long scanned = 0; + while( 1 ) { + scanned++; + /* todo add code to assure no excessive scanning for too long */ + if( ourTime == theirTime ) { + if( ourObj["h"].Long() == theirObj["h"].Long() ) { + // found the point back in time where we match. + // todo : check a few more just to be careful about hash collisions. + log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog; + log() << "replSet rollback findcommonpoint scanned : " << scanned << rsLog; + h.commonPoint = ourTime; + h.commonPointOurDiskloc = u.currLoc(); + return; + } + + refetch(h, ourObj); + + if( !t->more() ) { + log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS100 reached beginning of remote oplog [2]"; + } + theirObj = t->nextSafe(); + theirTime = theirObj["ts"]._opTime(); + + u.advance(); + if( !u.ok() ) { + log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS101 reached beginning of local oplog [1]"; + } + ourObj = u.current(); + ourTime = ourObj["ts"]._opTime(); + } + else if( theirTime > ourTime ) { + if( !t->more() ) { + log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS100 reached beginning of remote oplog [1]"; + } + theirObj = t->nextSafe(); + theirTime = theirObj["ts"]._opTime(); + } + else { + // theirTime < ourTime + refetch(h, ourObj); + u.advance(); + if( !u.ok() ) { + log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS101 reached beginning of local oplog [2]"; + } + ourObj = u.current(); + ourTime = ourObj["ts"]._opTime(); + } + } + } + + struct X { + const bson::bo *op; + bson::bo goodVersionOfObject; + }; + + static void setMinValid(bo newMinValid) { + try { + log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; + } + catch(...) { } + { + Helpers::putSingleton("local.replset.minvalid", newMinValid); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } + } + + void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { + DBClientConnection *them = r.conn(); + + // fetch all first so we needn't handle interruption in a fancy way + + unsigned long long totSize = 0; + + list< pair<DocID,bo> > goodVersions; + + bo newMinValid; + + /* fetch all the goodVersions of each document from current primary */ + DocID d; + unsigned long long n = 0; + try { + for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) { + d = *i; + + assert( !d._id.eoo() ); + + { + /* TODO : slow. lots of round trips. */ + n++; + bo good= them->findOne(d.ns, d._id.wrap(), NULL, QueryOption_SlaveOk).getOwned(); + totSize += good.objsize(); + uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); + + // note good might be eoo, indicating we should delete it + goodVersions.push_back(pair<DocID,bo>(d,good)); + } + } + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + sethbmsg("rollback error newMinValid empty?"); + return; + } + } + catch(DBException& e) { + sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); + log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; + throw e; + } + + MemoryMappedFile::flushAll(true); + + sethbmsg("rollback 3.5"); + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); + return; + } + + // update them + sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); + + bool warn = false; + + assert( !h.commonPointOurDiskloc.isNull() ); + + mongo::d.dbMutex.assertWriteLocked(); + + /* we have items we are writing that aren't from a point-in-time. thus best not to come online + until we get to that point in freshness. */ + setMinValid(newMinValid); + + /** any full collection resyncs required? */ + if( !h.collectionsToResync.empty() ) { + for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { + string ns = *i; + sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); + + Client::Context c(ns); + { + bob res; + string errmsg; + dropCollection(ns, errmsg, res); + { + dbtemprelease r; + bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, errmsg); + uassert(15909, str::stream() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg, ok); + } + } + } + + /* we did more reading from primary, so check it again for a rollback (which would mess us up), and + make minValid newer. + */ + sethbmsg("rollback 4.2"); + { + string err; + try { + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + err = "can't get minvalid from primary"; + } + else { + setMinValid(newMinValid); + } + } + catch (DBException&) { + err = "can't get/set minvalid"; + } + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + // however, we've now done writes. thus we have a problem. + err += "rbid at primary changed during resync/rollback"; + } + if( !err.empty() ) { + log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; + /* todo: reset minvalid so that we are permanently in fatal state */ + /* todo: don't be fatal, but rather, get all the data first. */ + sethbmsg("rollback error"); + throw rsfatal(); + } + } + sethbmsg("rollback 4.3"); + } + + sethbmsg("rollback 4.6"); + /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ + for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { + Client::Context c(*i); + try { + bob res; + string errmsg; + log(1) << "replSet rollback drop: " << *i << rsLog; + dropCollection(*i, errmsg, res); + } + catch(...) { + log() << "replset rollback error dropping collection " << *i << rsLog; + } + } + + sethbmsg("rollback 4.7"); + Client::Context c(rsoplog); + NamespaceDetails *oplogDetails = nsdetails(rsoplog); + uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); + + map<string,shared_ptr<RemoveSaver> > removeSavers; + + unsigned deletes = 0, updates = 0; + for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) { + const DocID& d = i->first; + bo pattern = d._id.wrap(); // { _id : ... } + try { + assert( d.ns && *d.ns ); + if( h.collectionsToResync.count(d.ns) ) { + /* we just synced this entire collection */ + continue; + } + + getDur().commitIfNeeded(); + + /* keep an archive of items rolled back */ + shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; + if ( ! rs ) + rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); + + // todo: lots of overhead in context, this can be faster + Client::Context c(d.ns); + if( i->second.isEmpty() ) { + // wasn't on the primary; delete. + /* TODO1.6 : can't delete from a capped collection. need to handle that here. */ + deletes++; + + NamespaceDetails *nsd = nsdetails(d.ns); + if( nsd ) { + if( nsd->capped ) { + /* can't delete from a capped collection - so we truncate instead. if this item must go, + so must all successors!!! */ + try { + /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */ + // this will crazy slow if no _id index. + long long start = Listener::getElapsedTimeMillis(); + DiskLoc loc = Helpers::findOne(d.ns, pattern, false); + if( Listener::getElapsedTimeMillis() - start > 200 ) + log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; + //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); + if( !loc.isNull() ) { + try { + nsd->cappedTruncateAfter(d.ns, loc, true); + } + catch(DBException& e) { + if( e.getCode() == 13415 ) { + // hack: need to just make cappedTruncate do this... + nsd->emptyCappedCollection(d.ns); + } + else { + throw; + } + } + } + } + catch(DBException& e) { + log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog; + } + } + else { + try { + deletes++; + deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() ); + } + catch(...) { + log() << "replSet error rollback delete failed ns:" << d.ns << rsLog; + } + } + // did we just empty the collection? if so let's check if it even exists on the source. + if( nsd->stats.nrecords == 0 ) { + try { + string sys = cc().database()->name + ".system.namespaces"; + bo o = them->findOne(sys, QUERY("name"<<d.ns)); + if( o.isEmpty() ) { + // we should drop + try { + bob res; + string errmsg; + dropCollection(d.ns, errmsg, res); + } + catch(...) { + log() << "replset error rolling back collection " << d.ns << rsLog; + } + } + } + catch(DBException& ) { + /* this isn't *that* big a deal, but is bad. */ + log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog; + } + } + } + } + else { + // todo faster... + OpDebug debug; + updates++; + _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() ); + } + } + catch(DBException& e) { + log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog; + warn = true; + } + } + + removeSavers.clear(); // this effectively closes all of them + + sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); + MemoryMappedFile::flushAll(true); + sethbmsg("rollback 6"); + + // clean up oplog + LOG(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; + // todo: fatal error if this throws? + oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); + + /* reset cached lastoptimewritten and h value */ + loadLastOpTimeWritten(); + + sethbmsg("rollback 7"); + MemoryMappedFile::flushAll(true); + + // done + if( warn ) + sethbmsg("issues during syncRollback, see log"); + else + sethbmsg("rollback done"); + } + + void ReplSetImpl::syncRollback(OplogReader&r) { + unsigned s = _syncRollback(r); + if( s ) + sleepsecs(s); + } + + unsigned ReplSetImpl::_syncRollback(OplogReader&r) { + assert( !lockedByMe() ); + assert( !d.dbMutex.atLeastReadLocked() ); + + sethbmsg("rollback 0"); + + writelocktry lk(rsoplog, 20000); + if( !lk.got() ) { + sethbmsg("rollback couldn't get write lock in a reasonable time"); + return 2; + } + + if( state().secondary() ) { + /* by doing this, we will not service reads (return an error as we aren't in secondary staate. + that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred + or removed or yielded later anyway. + + also, this is better for status reporting - we know what is happening. + */ + changeState(MemberState::RS_ROLLBACK); + } + + HowToFixUp how; + sethbmsg("rollback 1"); + { + r.resetCursor(); + + sethbmsg("rollback 2 FindCommonPoint"); + try { + syncRollbackFindCommonPoint(r.conn(), how); + } + catch( const char *p ) { + sethbmsg(string("rollback 2 error ") + p); + return 10; + } + catch( rsfatal& ) { + _fatal(); + return 2; + } + catch( DBException& e ) { + sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); + dbtemprelease r; + sleepsecs(60); + throw; + } + } + + sethbmsg("replSet rollback 3 fixup"); + + { + incRBID(); + try { + syncFixUp(how, r); + } + catch( rsfatal& ) { + sethbmsg("rollback fixup error"); + _fatal(); + return 2; + } + catch(...) { + incRBID(); throw; + } + incRBID(); + + /* success - leave "ROLLBACK" state + can go to SECONDARY once minvalid is achieved + */ + changeState(MemberState::RS_RECOVERING); + } + + return 0; + } + +} diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp new file mode 100644 index 00000000000..8bac981d951 --- /dev/null +++ b/src/mongo/db/repl/rs_sync.cpp @@ -0,0 +1,701 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../repl.h" +#include "connections.h" + +namespace mongo { + + using namespace bson; + extern unsigned replSetForceInitialSyncFailure; + + void NOINLINE_DECL blank(const BSONObj& o) { + if( *o.getStringField("op") != 'n' ) { + log() << "replSet skipping bad op in oplog: " << o.toString() << rsLog; + } + } + + /* apply the log op that is in param o + @return bool success (true) or failure (false) + */ + bool replset::SyncTail::syncApply(const BSONObj &o) { + const char *ns = o.getStringField("ns"); + if ( *ns == '.' || *ns == 0 ) { + blank(o); + return true; + } + + Client::Context ctx(ns); + ctx.getClient()->curop()->reset(); + return !applyOperation_inlock(o); + } + + /* initial oplog application, during initial sync, after cloning. + @return false on failure. + this method returns an error and doesn't throw exceptions (i think). + */ + bool ReplSetImpl::initialSyncOplogApplication(const OpTime& applyGTE, const OpTime& minValid) { + Member *source = 0; + OplogReader r; + + // keep trying to initial sync from oplog until we run out of targets + while ((source = _getOplogReader(r, applyGTE)) != 0) { + replset::InitialSync init(source->fullName()); + if (init.oplogApplication(r, source, applyGTE, minValid)) { + return true; + } + + r.resetConnection(); + veto(source->fullName(), 60); + log() << "replSet applying oplog from " << source->fullName() << " failed, trying again" << endl; + } + + log() << "replSet initial sync error: couldn't find oplog to sync from" << rsLog; + return false; + } + + bool replset::InitialSync::oplogApplication(OplogReader& r, const Member* source, + const OpTime& applyGTE, const OpTime& minValid) { + + const string hn = source->fullName(); + try { + r.tailingQueryGTE( rsoplog, applyGTE ); + if ( !r.haveCursor() ) { + log() << "replSet initial sync oplog query error" << rsLog; + return false; + } + + { + if( !r.more() ) { + sethbmsg("replSet initial sync error reading remote oplog"); + log() << "replSet initial sync error remote oplog (" << rsoplog << ") on host " << hn << " is empty?" << rsLog; + return false; + } + bo op = r.next(); + OpTime t = op["ts"]._opTime(); + r.putBack(op); + + if( op.firstElementFieldName() == string("$err") ) { + log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog; + return false; + } + + uassert( 13508 , str::stream() << "no 'ts' in first op in oplog: " << op , !t.isNull() ); + if( t > applyGTE ) { + sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync"); + log() << "replSet initial sync expected first optime of " << applyGTE << rsLog; + log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog; + return false; + } + + sethbmsg(str::stream() << "initial oplog application from " << hn << " starting at " + << t.toStringPretty() << " to " << minValid.toStringPretty()); + } + } + catch(DBException& e) { + log() << "replSet initial sync failing: " << e.toString() << rsLog; + return false; + } + + /* we lock outside the loop to avoid the overhead of locking on every operation. */ + writelock lk(""); + + // todo : use exhaust + OpTime ts; + time_t start = time(0); + unsigned long long n = 0; + int fails = 0; + while( ts < minValid ) { + try { + // There are some special cases with initial sync (see the catch block), so we + // don't want to break out of this while until we've reached minvalid. Thus, we'll + // keep trying to requery. + if( !r.more() ) { + OCCASIONALLY log() << "replSet initial sync oplog: no more records" << endl; + sleepsecs(1); + + r.resetCursor(); + r.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten); + if ( !r.haveCursor() ) { + if (fails++ > 30) { + log() << "replSet initial sync tried to query oplog 30 times, giving up" << endl; + return false; + } + } + + continue; + } + + BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ + ts = o["ts"]._opTime(); + + { + if( (source->state() != MemberState::RS_PRIMARY && + source->state() != MemberState::RS_SECONDARY) || + replSetForceInitialSyncFailure ) { + + int f = replSetForceInitialSyncFailure; + if( f > 0 ) { + replSetForceInitialSyncFailure = f-1; + log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog; + throw DBException("forced error",0); + } + log() << "replSet we are now primary" << rsLog; + throw DBException("primary changed",0); + } + + applyOp(o, applyGTE); + } + + if ( ++n % 1000 == 0 ) { + time_t now = time(0); + if (now - start > 10) { + // simple progress metering + log() << "replSet initialSyncOplogApplication applied " << n << " operations, synced to " + << ts.toStringPretty() << rsLog; + start = now; + } + } + + getDur().commitIfNeeded(); + } + catch (DBException& e) { + // Skip duplicate key exceptions. + // These are relatively common on initial sync: if a document is inserted + // early in the clone step, the insert will be replayed but the document + // will probably already have been cloned over. + if( e.getCode() == 11000 || e.getCode() == 11001 || e.getCode() == 12582) { + continue; + } + + // handle cursor not found (just requery) + if( e.getCode() == 13127 ) { + log() << "replSet requerying oplog after cursor not found condition, ts: " << ts.toStringPretty() << endl; + r.resetCursor(); + r.tailingQueryGTE(rsoplog, ts); + if( r.haveCursor() ) { + continue; + } + } + + // TODO: handle server restart + + if( ts <= minValid ) { + // didn't make it far enough + log() << "replSet initial sync failing, error applying oplog : " << e.toString() << rsLog; + return false; + } + + // otherwise, whatever, we'll break out of the loop and catch + // anything that's really wrong in syncTail + } + } + return true; + } + + void replset::InitialSync::applyOp(const BSONObj& o, const OpTime& applyGTE) { + OpTime ts = o["ts"]._opTime(); + + // optimes before we started copying need not be applied. + if( ts >= applyGTE ) { + if (!syncApply(o)) { + if (shouldRetry(o)) { + uassert(15915, "replSet update still fails after adding missing object", syncApply(o)); + } + } + } + + // with repl sets we write the ops to our oplog, too + _logOpObjRS(o); + } + + /* should be in RECOVERING state on arrival here. + readlocks + @return true if transitioned to SECONDARY + */ + bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool golive = false; + + { + lock lk( this ); + + if (_maintenanceMode > 0) { + // we're not actually going live + return true; + } + } + + { + readlock lk("local.replset.minvalid"); + BSONObj mv; + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + minvalid = mv["ts"]._opTime(); + if( minvalid <= lastOpTimeWritten ) { + golive=true; + } + } + else + golive = true; /* must have been the original member */ + } + if( golive ) { + sethbmsg(""); + changeState(MemberState::RS_SECONDARY); + } + return golive; + } + + bool ReplSetImpl::_isStale(OplogReader& r, const OpTime& startTs, BSONObj& remoteOldestOp) { + remoteOldestOp = r.findOne(rsoplog, Query()); + OpTime remoteTs = remoteOldestOp["ts"]._opTime(); + DEV log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; + else LOG(3) << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; + DEV { + log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet our state: " << state().toString() << rsLog; + } + if( startTs >= remoteTs ) { + return false; + } + + return true; + } + + Member* ReplSetImpl::_getOplogReader(OplogReader& r, const OpTime& minTS) { + Member *target = 0, *stale = 0; + BSONObj oldest; + + assert(r.conn() == 0); + + while ((target = getMemberToSyncTo()) != 0) { + string current = target->fullName(); + + if( !r.connect(current) ) { + log(2) << "replSet can't connect to " << current << " to read operations" << rsLog; + r.resetConnection(); + veto(current); + continue; + } + + if( !minTS.isNull() && _isStale(r, minTS, oldest) ) { + r.resetConnection(); + veto(current, 600); + stale = target; + continue; + } + + // if we made it here, the target is up and not stale + return target; + } + + // the only viable sync target was stale + if (stale) { + log() << "replSet error RS102 too stale to catch up, at least from " << stale->fullName() << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet oldest at " << stale->fullName() << " : " << oldest["ts"]._opTime().toStringLong() << rsLog; + log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; + + // reset minvalid so that we can't become primary prematurely + { + writelock lk("local.replset.minvalid"); + Helpers::putSingleton("local.replset.minvalid", oldest); + } + + sethbmsg("error RS102 too stale to catch up"); + changeState(MemberState::RS_RECOVERING); + sleepsecs(120); + } + + return 0; + } + + /* tail an oplog. ok to return, will be re-called. */ + void ReplSetImpl::syncTail() { + // todo : locking vis a vis the mgr... + OplogReader r; + string hn; + + // find a target to sync from the last op time written + Member* target = _getOplogReader(r, lastOpTimeWritten); + + // no server found + if (target == 0) { + // if there is no one to sync from + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); + return; + } + + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); + // if target cut connections between connecting and querying (for + // example, because it stepped down) we might not have a cursor + if ( !r.haveCursor() ) { + return; + } + + uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() ); + + { + if( !r.more() ) { + /* maybe we are ahead and need to roll back? */ + try { + bo theirLastOp = r.getLastOp(rsoplog); + if( theirLastOp.isEmpty() ) { + log() << "replSet error empty query result from " << hn << " oplog" << rsLog; + sleepsecs(2); + return; + } + OpTime theirTS = theirLastOp["ts"]._opTime(); + if( theirTS < lastOpTimeWritten ) { + log() << "replSet we are ahead of the primary, will try to roll back" << rsLog; + syncRollback(r); + return; + } + /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ + log() << "replSet syncTail condition 1" << rsLog; + sleepsecs(1); + } + catch(DBException& e) { + log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog; + veto(target->fullName()); + sleepsecs(2); + } + return; + } + + BSONObj o = r.nextSafe(); + OpTime ts = o["ts"]._opTime(); + long long h = o["h"].numberLong(); + if( ts != lastOpTimeWritten || h != lastH ) { + log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << rsLog; + log() << "replset source's GTE: " << ts.toStringPretty() << rsLog; + syncRollback(r); + return; + } + } + + /* we have now checked if we need to rollback and we either don't have to or did it. */ + { + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); + } + + while( 1 ) { + { + Timer timeInWriteLock; + writelock lk(""); + while( 1 ) { + if( !r.moreInCurrentBatch() ) { + dbtemprelease tempRelease; + { + // we need to occasionally check some things. between + // batches is probably a good time. + if( state().recovering() ) { // perhaps we should check this earlier? but not before the rollback checks. + /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */ + OpTime minvalid; + bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid); + if( golive ) { + ; + } + else { + sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString()); + } + // todo: too stale capability + } + if( !target->hbinfo().hbstate.readable() ) { + return; + } + } + r.more(); // to make the requestmore outside the db lock, which obviously is quite important + } + if( timeInWriteLock.micros() > 1000 ) { + dbtemprelease tempRelease; + timeInWriteLock.reset(); + } + if( !r.more() ) + break; + { + BSONObj o = r.nextSafe(); // note we might get "not master" at some point + + int sd = myConfig().slaveDelay; + // ignore slaveDelay if the box is still initializing. once + // it becomes secondary we can worry about it. + if( sd && box.getState().secondary() ) { + const OpTime ts = o["ts"]._opTime(); + long long a = ts.getSecs(); + long long b = time(0); + long long lag = b - a; + long long sleeptime = sd - lag; + if( sleeptime > 0 ) { + dbtemprelease tempRelease; + uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000); + if( sleeptime < 60 ) { + sleepsecs((int) sleeptime); + } + else { + log() << "replSet slavedelay sleep long time: " << sleeptime << rsLog; + // sleep(hours) would prevent reconfigs from taking effect & such! + long long waitUntil = b + sleeptime; + while( 1 ) { + sleepsecs(6); + if( time(0) >= waitUntil ) + break; + + if( !target->hbinfo().hbstate.readable() ) { + break; + } + + if( myConfig().slaveDelay != sd ) // reconf + break; + } + } + } + } // endif slaveDelay + + d.dbMutex.assertWriteLocked(); + try { + /* if we have become primary, we dont' want to apply things from elsewhere + anymore. assumePrimary is in the db lock so we are safe as long as + we check after we locked above. */ + if( box.getState().primary() ) { + log(0) << "replSet stopping syncTail we are now primary" << rsLog; + return; + } + + // TODO: make this whole method a member of SyncTail (SERVER-4444) + replset::SyncTail tail(""); + tail.syncApply(o); + _logOpObjRS(o); // with repl sets we write the ops to our oplog too + } + catch (DBException& e) { + sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o); + veto(target->fullName(), 300); + sleepsecs(30); + return; + } + } + } // end while + } // end writelock scope + + r.tailCheck(); + if( !r.haveCursor() ) { + LOG(1) << "replSet end syncTail pass with " << hn << rsLog; + // TODO : reuse our connection to the primary. + return; + } + + if( !target->hbinfo().hbstate.readable() ) { + return; + } + // looping back is ok because this is a tailable cursor + } + } + + void ReplSetImpl::_syncThread() { + StateBox::SP sp = box.get(); + if( sp.state.primary() ) { + sleepsecs(1); + return; + } + if( _blockSync || sp.state.fatal() || sp.state.startup() ) { + sleepsecs(5); + return; + } + + /* do we have anything at all? */ + if( lastOpTimeWritten.isNull() ) { + syncDoInitialSync(); + return; // _syncThread will be recalled, starts from top again in case sync failed. + } + + /* we have some data. continue tailing. */ + syncTail(); + } + + void ReplSetImpl::syncThread() { + while( 1 ) { + // After a reconfig, we may not be in the replica set anymore, so + // check that we are in the set (and not an arbiter) before + // trying to sync with other replicas. + if( ! _self ) { + log() << "replSet warning did not detect own host and port, not syncing, config: " << theReplSet->config() << rsLog; + return; + } + if( myConfig().arbiterOnly ) { + return; + } + + try { + _syncThread(); + } + catch(DBException& e) { + sethbmsg(str::stream() << "syncThread: " << e.toString()); + sleepsecs(10); + } + catch(...) { + sethbmsg("unexpected exception in syncThread()"); + // TODO : SET NOT SECONDARY here? + sleepsecs(60); + } + sleepsecs(1); + + /* normally msgCheckNewState gets called periodically, but in a single node repl set there + are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton + member has done a stepDown() and needs to come back up. + */ + OCCASIONALLY { + mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + } + } + } + + void startSyncThread() { + static int n; + if( n != 0 ) { + log() << "replSet ERROR : more than one sync thread?" << rsLog; + assert( n == 0 ); + } + n++; + + Client::initThread("rsSync"); + cc().iAmSyncThread(); // for isSyncThread() (which is used not used much, is used in secondary create index code + replLocalAuth(); + theReplSet->syncThread(); + cc().shutdown(); + } + + void GhostSync::starting() { + Client::initThread("rsGhostSync"); + replLocalAuth(); + } + + void ReplSetImpl::blockSync(bool block) { + _blockSync = block; + if (_blockSync) { + // syncing is how we get into SECONDARY state, so we'll be stuck in + // RECOVERING until we unblock + changeState(MemberState::RS_RECOVERING); + } + } + + void GhostSync::associateSlave(const BSONObj& id, const int memberId) { + const OID rid = id["_id"].OID(); + rwlock lk( _lock , true ); + shared_ptr<GhostSlave> &g = _ghostCache[rid]; + if( g.get() == 0 ) { + g.reset( new GhostSlave() ); + wassert( _ghostCache.size() < 10000 ); + } + GhostSlave &slave = *g; + if (slave.init) { + LOG(1) << "tracking " << slave.slave->h().toString() << " as " << rid << rsLog; + return; + } + + slave.slave = (Member*)rs->findById(memberId); + if (slave.slave != 0) { + slave.init = true; + } + else { + log() << "replset couldn't find a slave with id " << memberId + << ", not tracking " << rid << rsLog; + } + } + + void GhostSync::updateSlave(const mongo::OID& rid, const OpTime& last) { + rwlock lk( _lock , false ); + MAP::iterator i = _ghostCache.find( rid ); + if ( i == _ghostCache.end() ) { + OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog; + return; + } + + GhostSlave& slave = *(i->second); + if (!slave.init) { + OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; + return; + } + + ((ReplSetConfig::MemberCfg)slave.slave->config()).updateGroups(last); + } + + void GhostSync::percolate(const BSONObj& id, const OpTime& last) { + const OID rid = id["_id"].OID(); + GhostSlave* slave; + { + rwlock lk( _lock , false ); + + MAP::iterator i = _ghostCache.find( rid ); + if ( i == _ghostCache.end() ) { + OCCASIONALLY log() << "couldn't percolate slave " << rid << " no entry" << rsLog; + return; + } + + slave = i->second.get(); + if (!slave->init) { + OCCASIONALLY log() << "couldn't percolate slave " << rid << " not init" << rsLog; + return; + } + } + + assert(slave->slave); + + const Member *target = rs->_currentSyncTarget; + if (!target || rs->box.getState().primary() + // we are currently syncing from someone who's syncing from us + // the target might end up with a new Member, but s.slave never + // changes so we'll compare the names + || target == slave->slave || target->fullName() == slave->slave->fullName()) { + LOG(1) << "replica set ghost target no good" << endl; + return; + } + + try { + if (!slave->reader.haveCursor()) { + if (!slave->reader.connect(id, slave->slave->id(), target->fullName())) { + // error message logged in OplogReader::connect + return; + } + slave->reader.ghostQueryGTE(rsoplog, last); + } + + LOG(1) << "replSet last: " << slave->last.toString() << " to " << last.toString() << rsLog; + if (slave->last > last) { + return; + } + + while (slave->last <= last) { + if (!slave->reader.more()) { + // we'll be back + return; + } + + BSONObj o = slave->reader.nextSafe(); + slave->last = o["ts"]._opTime(); + } + LOG(2) << "now last is " << slave->last.toString() << rsLog; + } + catch (DBException& e) { + // we'll be back + LOG(2) << "replSet ghost sync error: " << e.what() << " for " + << slave->slave->fullName() << rsLog; + slave->reader.resetConnection(); + } + } +} diff --git a/src/mongo/db/repl/test.html b/src/mongo/db/repl/test.html new file mode 100644 index 00000000000..295ad2ef0e0 --- /dev/null +++ b/src/mongo/db/repl/test.html @@ -0,0 +1,11 @@ +<HTML>
+<BODY>
+<!-- see also jstests/rs/ -->
+<iframe src="http://127.0.0.1:28000/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+<iframe src="http://127.0.0.1:28001/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+</BODY>
+</HTML>
diff --git a/src/mongo/db/repl/testing.js b/src/mongo/db/repl/testing.js new file mode 100644 index 00000000000..d741cf3a644 --- /dev/null +++ b/src/mongo/db/repl/testing.js @@ -0,0 +1,42 @@ +// helpers for testing repl sets
+// run
+// mongo --shell <host:port> testing.js
+
+cfg = {
+ _id: 'asdf',
+ members: [
+ { _id : 0, host : "dm_hp" },
+ { _id : 2, host : "dm_hp:27002" }
+ ]
+};
+c2 = {
+ _id: 'asdf',
+ members: [
+ { _id: 0, host: "dmthink" },
+ { _id: 2, host: "dmthink:27002" }
+ ]
+};
+
+db = db.getSisterDB("admin");
+local = db.getSisterDB("local");
+
+print("\n\ndb = admin db on localhost:27017");
+print("b = admin on localhost:27002");
+print("rc(x) = db.runCommand(x)");
+print("cfg = samp replset config");
+print("i() = replSetInitiate(cfg)");
+print("ism() = rc('ismaster')");
+print("\n\n");
+
+function rc(c) { return db.runCommand(c); }
+function i() { return rc({ replSetInitiate: cfg }); }
+function ism() { return rc("isMaster"); }
+
+b = 0;
+try {
+ b = new Mongo("localhost:27002").getDB("admin");
+}
+catch (e) {
+ print("\nCouldn't connect to b mongod instance\n");
+}
+
|