summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/consensus.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/consensus.cpp')
-rw-r--r--src/mongo/db/repl/consensus.cpp449
1 files changed, 449 insertions, 0 deletions
diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp
new file mode 100644
index 00000000000..3995373f5ef
--- /dev/null
+++ b/src/mongo/db/repl/consensus.cpp
@@ -0,0 +1,449 @@
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../commands.h"
+#include "rs.h"
+#include "multicmd.h"
+
+namespace mongo {
+
+ class CmdReplSetFresh : public ReplSetCommand {
+ public:
+ CmdReplSetFresh() : ReplSetCommand("replSetFresh") { }
+ private:
+
+ bool shouldVeto(const BSONObj& cmdObj, string& errmsg) {
+ unsigned id = cmdObj["id"].Int();
+ const Member* primary = theReplSet->box.getPrimary();
+ const Member* hopeful = theReplSet->findById(id);
+ const Member *highestPriority = theReplSet->getMostElectable();
+
+ if( !hopeful ) {
+ errmsg = str::stream() << "replSet couldn't find member with id " << id;
+ return true;
+ }
+ else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
+ // hbinfo is not updated, so we have to check the primary's last optime separately
+ errmsg = str::stream() << "I am already primary, " << hopeful->fullName() <<
+ " can try again once I've stepped down";
+ return true;
+ }
+ else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
+ // other members might be aware of more up-to-date nodes
+ errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " <<
+ primary->fullName() << " is already primary and more up-to-date";
+ return true;
+ }
+ else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
+ errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
+ return true;
+ }
+
+ // don't veto older versions
+ if (cmdObj["id"].eoo()) {
+ // they won't be looking for the veto field
+ return false;
+ }
+
+ if ( !theReplSet->isElectable(id) ||
+ (highestPriority && highestPriority->config().priority > hopeful->config().priority)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+
+ if( cmdObj["set"].String() != theReplSet->name() ) {
+ errmsg = "wrong repl set name";
+ return false;
+ }
+ string who = cmdObj["who"].String();
+ int cfgver = cmdObj["cfgver"].Int();
+ OpTime opTime(cmdObj["opTime"].Date());
+
+ bool weAreFresher = false;
+ if( theReplSet->config().version > cfgver ) {
+ log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog;
+ result.append("info", "config version stale");
+ weAreFresher = true;
+ }
+ // check not only our own optime, but any other member we can reach
+ else if( opTime < theReplSet->lastOpTimeWritten ||
+ opTime < theReplSet->lastOtherOpTime()) {
+ weAreFresher = true;
+ }
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ result.append("fresher", weAreFresher);
+ result.append("veto", shouldVeto(cmdObj, errmsg));
+
+ return true;
+ }
+ } cmdReplSetFresh;
+
+ class CmdReplSetElect : public ReplSetCommand {
+ public:
+ CmdReplSetElect() : ReplSetCommand("replSetElect") { }
+ private:
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ theReplSet->elect.electCmdReceived(cmdObj, &result);
+ return true;
+ }
+ } cmdReplSetElect;
+
+ int Consensus::totalVotes() const {
+ static int complain = 0;
+ int vTot = rs._self->config().votes;
+ for( Member *m = rs.head(); m; m=m->next() )
+ vTot += m->config().votes;
+ if( vTot % 2 == 0 && vTot && complain++ == 0 )
+ log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog;
+ return vTot;
+ }
+
+ bool Consensus::aMajoritySeemsToBeUp() const {
+ int vUp = rs._self->config().votes;
+ for( Member *m = rs.head(); m; m=m->next() )
+ vUp += m->hbinfo().up() ? m->config().votes : 0;
+ return vUp * 2 > totalVotes();
+ }
+
+ bool Consensus::shouldRelinquish() const {
+ int vUp = rs._self->config().votes;
+ const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries;
+ for( Member *m = rs.head(); m; m=m->next() ) {
+ long long dt = m->hbinfo().timeDown();
+ if( dt < T )
+ vUp += m->config().votes;
+ }
+
+ // the manager will handle calling stepdown if another node should be
+ // primary due to priority
+
+ return !( vUp * 2 > totalVotes() );
+ }
+
+ static const int VETO = -10000;
+
+ const time_t LeaseTime = 30;
+
+ SimpleMutex Consensus::lyMutex("ly");
+
+ unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */
+ SimpleMutex::scoped_lock lk(lyMutex);
+ LastYea &L = this->ly.ref(lk);
+ time_t now = time(0);
+ if( L.when + LeaseTime >= now && L.who != memberId ) {
+ LOG(1) << "replSet not voting yea for " << memberId <<
+ " voted for " << L.who << ' ' << now-L.when << " secs ago" << rsLog;
+ throw VoteException();
+ }
+ L.when = now;
+ L.who = memberId;
+ return rs._self->config().votes;
+ }
+
+ /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in
+ place instead of leaving it for a long time.
+ */
+ void Consensus::electionFailed(unsigned meid) {
+ SimpleMutex::scoped_lock lk(lyMutex);
+ LastYea &L = ly.ref(lk);
+ DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test
+ if( L.who == meid )
+ L.when = 0;
+ }
+
+ /* todo: threading **************** !!!!!!!!!!!!!!!! */
+ void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
+ BSONObjBuilder& b = *_b;
+ DEV log() << "replSet received elect msg " << cmd.toString() << rsLog;
+ else LOG(2) << "replSet received elect msg " << cmd.toString() << rsLog;
+ string set = cmd["set"].String();
+ unsigned whoid = cmd["whoid"].Int();
+ int cfgver = cmd["cfgver"].Int();
+ OID round = cmd["round"].OID();
+ int myver = rs.config().version;
+
+ const Member* primary = rs.box.getPrimary();
+ const Member* hopeful = rs.findById(whoid);
+ const Member* highestPriority = rs.getMostElectable();
+
+ int vote = 0;
+ if( set != rs.name() ) {
+ log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
+ }
+ else if( myver < cfgver ) {
+ // we are stale. don't vote
+ }
+ else if( myver > cfgver ) {
+ // they are stale!
+ log() << "replSet electCmdReceived info got stale version # during election" << rsLog;
+ vote = -10000;
+ }
+ else if( !hopeful ) {
+ log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog;
+ vote = -10000;
+ }
+ else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
+ // hbinfo is not updated, so we have to check the primary's last optime separately
+ log() << "I am already primary, " << hopeful->fullName()
+ << " can try again once I've stepped down" << rsLog;
+ vote = -10000;
+ }
+ else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
+ // other members might be aware of more up-to-date nodes
+ log() << hopeful->fullName() << " is trying to elect itself but " <<
+ primary->fullName() << " is already primary and more up-to-date" << rsLog;
+ vote = -10000;
+ }
+ else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
+ log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
+ vote = -10000;
+ }
+ else {
+ try {
+ vote = yea(whoid);
+ dassert( hopeful->id() == whoid );
+ rs.relinquish();
+ log() << "replSet info voting yea for " << hopeful->fullName() << " (" << whoid << ')' << rsLog;
+ }
+ catch(VoteException&) {
+ log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog;
+ }
+ }
+
+ b.append("vote", vote);
+ b.append("round", round);
+ }
+
+ void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) {
+ configVersion = config().version;
+ for( Member *m = head(); m; m=m->next() )
+ if( m->hbinfo().maybeUp() )
+ L.push_back( Target(m->fullName()) );
+ }
+
+ /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need
+ to check later that the config didn't change. */
+ void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) {
+ if( lockedByMe() ) {
+ _getTargets(L, configVersion);
+ return;
+ }
+ lock lk(this);
+ _getTargets(L, configVersion);
+ }
+
+ /* Do we have the newest data of them all?
+ @param allUp - set to true if all members are up. Only set if true returned.
+ @return true if we are freshest. Note we may tie.
+ */
+ bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
+ const OpTime ord = theReplSet->lastOpTimeWritten;
+ nTies = 0;
+ assert( !ord.isNull() );
+ BSONObj cmd = BSON(
+ "replSetFresh" << 1 <<
+ "set" << rs.name() <<
+ "opTime" << Date_t(ord.asDate()) <<
+ "who" << rs._self->fullName() <<
+ "cfgver" << rs._cfg->version <<
+ "id" << rs._self->id());
+ list<Target> L;
+ int ver;
+ /* the following queries arbiters, even though they are never fresh. wonder if that makes sense.
+ it doesn't, but it could, if they "know" what freshness it one day. so consider removing
+ arbiters from getTargets() here. although getTargets is used elsewhere for elections; there
+ arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
+ not fetching them herein happen.
+ */
+ rs.getTargets(L, ver);
+ multiCommand(cmd, L);
+ int nok = 0;
+ allUp = true;
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ if( i->ok ) {
+ nok++;
+ if( i->result["fresher"].trueValue() ) {
+ log() << "not electing self, we are not freshest" << rsLog;
+ return false;
+ }
+ OpTime remoteOrd( i->result["opTime"].Date() );
+ if( remoteOrd == ord )
+ nTies++;
+ assert( remoteOrd <= ord );
+
+ if( i->result["veto"].trueValue() ) {
+ BSONElement msg = i->result["errmsg"];
+ if (!msg.eoo()) {
+ log() << "not electing self, " << i->toHost << " would veto with '" <<
+ msg.String() << "'" << rsLog;
+ }
+ else {
+ log() << "not electing self, " << i->toHost << " would veto" << rsLog;
+ }
+ return false;
+ }
+ }
+ else {
+ DEV log() << "replSet freshest returns " << i->result.toString() << rsLog;
+ allUp = false;
+ }
+ }
+ LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
+ assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
+ return true;
+ }
+
+ extern time_t started;
+
+ void Consensus::multiCommand(BSONObj cmd, list<Target>& L) {
+ assert( !rs.lockedByMe() );
+ mongo::multiCommand(cmd, L);
+ }
+
+ void Consensus::_electSelf() {
+ if( time(0) < steppedDown )
+ return;
+
+ {
+ const OpTime ord = theReplSet->lastOpTimeWritten;
+ if( ord == 0 ) {
+ log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog;
+ return;
+ }
+ }
+
+ bool allUp;
+ int nTies;
+ if( !weAreFreshest(allUp, nTies) ) {
+ return;
+ }
+
+ rs.sethbmsg("",9);
+
+ if( !allUp && time(0) - started < 60 * 5 ) {
+ /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data
+ if we don't have to -- we'd rather be offline and wait a little longer instead
+ todo: make this configurable.
+ */
+ rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes");
+ return;
+ }
+
+ Member& me = *rs._self;
+
+ if( nTies ) {
+ /* tie? we then randomly sleep to try to not collide on our voting. */
+ /* todo: smarter. */
+ if( me.id() == 0 || sleptLast ) {
+ // would be fine for one node not to sleep
+ // todo: biggest / highest priority nodes should be the ones that get to not sleep
+ }
+ else {
+ assert( !rs.lockedByMe() ); // bad to go to sleep locked
+ unsigned ms = ((unsigned) rand()) % 1000 + 50;
+ DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog;
+ sleptLast = true;
+ sleepmillis(ms);
+ throw RetryAfterSleepException();
+ }
+ }
+ sleptLast = false;
+
+ time_t start = time(0);
+ unsigned meid = me.id();
+ int tally = yea( meid );
+ bool success = false;
+ try {
+ log() << "replSet info electSelf " << meid << rsLog;
+
+ BSONObj electCmd = BSON(
+ "replSetElect" << 1 <<
+ "set" << rs.name() <<
+ "who" << me.fullName() <<
+ "whoid" << me.hbinfo().id() <<
+ "cfgver" << rs._cfg->version <<
+ "round" << OID::gen() /* this is just for diagnostics */
+ );
+
+ int configVersion;
+ list<Target> L;
+ rs.getTargets(L, configVersion);
+ multiCommand(electCmd, L);
+
+ {
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ DEV log() << "replSet elect res: " << i->result.toString() << rsLog;
+ if( i->ok ) {
+ int v = i->result["vote"].Int();
+ tally += v;
+ }
+ }
+ if( tally*2 <= totalVotes() ) {
+ log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog;
+ }
+ else if( time(0) - start > 30 ) {
+ // defensive; should never happen as we have timeouts on connection and operation for our conn
+ log() << "replSet too much time passed during our election, ignoring result" << rsLog;
+ }
+ else if( configVersion != rs.config().version ) {
+ log() << "replSet config version changed during our election, ignoring result" << rsLog;
+ }
+ else {
+ /* succeeded. */
+ log(1) << "replSet election succeeded, assuming primary role" << rsLog;
+ success = true;
+ rs.assumePrimary();
+ }
+ }
+ }
+ catch( std::exception& ) {
+ if( !success ) electionFailed(meid);
+ throw;
+ }
+ if( !success ) electionFailed(meid);
+ }
+
+ void Consensus::electSelf() {
+ assert( !rs.lockedByMe() );
+ assert( !rs.myConfig().arbiterOnly );
+ assert( rs.myConfig().slaveDelay == 0 );
+ try {
+ _electSelf();
+ }
+ catch(RetryAfterSleepException&) {
+ throw;
+ }
+ catch(VoteException& ) {
+ log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog;
+ }
+ catch(DBException& e) {
+ log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog;
+ }
+ catch(...) {
+ log() << "replSet warning caught unexpected exception in electSelf()" << rsLog;
+ }
+ }
+
+}