diff options
author | Spencer T Brody <spencer@mongodb.com> | 2014-11-07 18:19:27 -0500 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2014-11-20 11:38:11 -0500 |
commit | 10cf936a3635a72ee61714631050cf54466410eb (patch) | |
tree | fa59813385647ff01eb1124fb0506685a273b5cc /src/mongo/db | |
parent | 13577a48b51202aabd2e55ef95404439aaa4a0c3 (diff) | |
download | mongo-10cf936a3635a72ee61714631050cf54466410eb.tar.gz |
SERVER-15496 Remove legacy replication code
Diffstat (limited to 'src/mongo/db')
49 files changed, 44 insertions, 7111 deletions
diff --git a/src/mongo/db/commands/auth_schema_upgrade_d.cpp b/src/mongo/db/commands/auth_schema_upgrade_d.cpp index f00d190fee9..e5ff933b651 100644 --- a/src/mongo/db/commands/auth_schema_upgrade_d.cpp +++ b/src/mongo/db/commands/auth_schema_upgrade_d.cpp @@ -37,8 +37,6 @@ #include "mongo/db/commands/user_management_commands.h" #include "mongo/db/repl/multicmd.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rs_config.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/version.h" diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 93695a328b5..482bb0f716d 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -71,8 +71,12 @@ #include "mongo/db/operation_context_impl.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/range_deleter_service.h" +#include "mongo/db/repl/network_interface_impl.h" +#include "mongo/db/repl/repl_coordinator_external_state_impl.h" #include "mongo/db/repl/repl_coordinator_global.h" +#include "mongo/db/repl/repl_coordinator_impl.h" #include "mongo/db/repl/repl_settings.h" +#include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/restapi.h" #include "mongo/db/server_parameters.h" #include "mongo/db/startup_warnings_mongod.h" @@ -769,6 +773,17 @@ MONGO_INITIALIZER_GENERAL(CreateAuthorizationManager, return Status::OK(); } +MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnvironment")) + (InitializerContext* context) { + repl::setGlobalReplicationCoordinator(new repl::ReplicationCoordinatorImpl( + getGlobalReplSettings(), + new repl::ReplicationCoordinatorExternalStateImpl, + new repl::NetworkInterfaceImpl, + new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)), + static_cast<int64_t>(curTimeMillis64()))); + return Status::OK(); +} + #ifdef MONGO_SSL MONGO_INITIALIZER_GENERAL(setSSLManagerType, MONGO_NO_PREREQUISITES, diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 1a1d4323564..cf6be9c7cac 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -42,7 +42,6 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/repl_coordinator_impl.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/rslog.h" diff --git a/src/mongo/db/repl/consensus.cpp b/src/mongo/db/repl/consensus.cpp deleted file mode 100644 index e031c4a05a8..00000000000 --- a/src/mongo/db/repl/consensus.cpp +++ /dev/null @@ -1,377 +0,0 @@ -/** -* Copyright (C) 2010 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/consensus.h" - -#include "mongo/base/string_data.h" -#include "mongo/db/global_optime.h" -#include "mongo/db/repl/multicmd.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/replset_commands.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/util/log.h" - -namespace mongo { - -namespace repl { - - void Consensus::electCmdReceived(const StringData& set, - unsigned whoid, - int cfgver, - const OID& round, - BSONObjBuilder* _b) { - BSONObjBuilder& b = *_b; - int myver = rs.config().version; - - const Member* primary = rs.box.getPrimary(); - const Member* hopeful = rs.findById(whoid); - const Member* highestPriority = rs.getMostElectable(); - - int vote = 0; - if( set != rs.name() ) { - log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; - } - else if( myver < cfgver ) { - // we are stale. don't vote - } - else if( myver > cfgver ) { - // they are stale! - log() << "replSet electCmdReceived info got stale version # during election" << rsLog; - vote = -10000; - } - else if( !hopeful ) { - log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog; - vote = -10000; - } - else if( primary && primary == rs._self) { - log() << "I am already primary, " << hopeful->fullName() - << " can try again once I've stepped down" << rsLog; - vote = -10000; - } - else if (primary) { - log() << hopeful->fullName() << " is trying to elect itself but " << - primary->fullName() << " is already primary" << rsLog; - vote = -10000; - } - else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) { - log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName(); - vote = -10000; - } - else { - try { - vote = _yea(whoid); - dassert( hopeful->id() == whoid ); - log() << "replSet info voting yea for " << hopeful->fullName() << " (" << whoid << ')' << rsLog; - } - catch(VoteException&) { - log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog; - } - } - - b.append("vote", vote); - b.append("round", round); - } - - int Consensus::_totalVotes() const { - static int complain = 0; - int vTot = rs._self->config().votes; - for( Member *m = rs.head(); m; m=m->next() ) - vTot += m->config().votes; - if( vTot % 2 == 0 && vTot && complain++ == 0 ) - log() << "replSet warning: even number of voting members in replica set config - " - "add an arbiter or set votes to 0 on one of the existing members" << rsLog; - return vTot; - } - - bool Consensus::aMajoritySeemsToBeUp() const { - int vUp = rs._self->config().votes; - for( Member *m = rs.head(); m; m=m->next() ) - vUp += m->hbinfo().up() ? m->config().votes : 0; - return vUp * 2 > _totalVotes(); - } - - bool Consensus::shouldRelinquish() const { - int vUp = rs._self->config().votes; - for( Member *m = rs.head(); m; m=m->next() ) { - if (m->hbinfo().up()) { - vUp += m->config().votes; - } - } - - // the manager will handle calling stepdown if another node should be - // primary due to priority - - return !( vUp * 2 > _totalVotes() ); - } - - const time_t LeaseTime = 30; - - SimpleMutex Consensus::lyMutex("ly"); - - unsigned Consensus::_yea(unsigned memberId) { /* throws VoteException */ - SimpleMutex::scoped_lock lk(lyMutex); - LastYea &L = _ly; - time_t now = time(0); - if( L.when + LeaseTime >= now && L.who != memberId ) { - LOG(1) << "replSet not voting yea for " << memberId << - " voted for " << L.who << ' ' << now-L.when << " secs ago" << rsLog; - throw VoteException(); - } - L.when = now; - L.who = memberId; - return rs._self->config().votes; - } - - /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in - place instead of leaving it for a long time. - */ - void Consensus::_electionFailed(unsigned meid) { - SimpleMutex::scoped_lock lk(lyMutex); - LastYea &L = _ly; - DEV verify( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test - if( L.who == meid ) - L.when = 0; - } - - void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) { - configVersion = config().version; - for( Member *m = head(); m; m=m->next() ) - if( m->hbinfo().maybeUp() ) - L.push_back( Target(m->fullName()) ); - } - - /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need - to check later that the config didn't change. */ - void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) { - if( lockedByMe() ) { - _getTargets(L, configVersion); - return; - } - lock lk(this); - _getTargets(L, configVersion); - } - - /* Do we have the newest data of them all? - @param allUp - set to true if all members are up. Only set if true returned. - @return true if we are freshest. Note we may tie. - */ - bool Consensus::_weAreFreshest(bool& allUp, int& nTies) { - const OpTime ord = theReplSet->lastOpTimeWritten; - nTies = 0; - verify( !ord.isNull() ); - BSONObj cmd = BSON( - "replSetFresh" << 1 << - "set" << rs.name() << - "opTime" << Date_t(ord.asDate()) << - "who" << rs._self->fullName() << - "cfgver" << rs._cfg->version << - "id" << rs._self->id()); - list<Target> L; - int ver; - /* the following queries arbiters, even though they are never fresh. wonder if that makes sense. - it doesn't, but it could, if they "know" what freshness it one day. so consider removing - arbiters from getTargets() here. although getTargets is used elsewhere for elections; there - arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make - not fetching them herein happen. - */ - rs.getTargets(L, ver); - _multiCommand(cmd, L); - int nok = 0; - allUp = true; - for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { - if( i->ok ) { - nok++; - if( i->result["fresher"].trueValue() ) { - log() << "not electing self, we are not freshest" << rsLog; - return false; - } - OpTime remoteOrd( i->result["opTime"].Date() ); - if( remoteOrd == ord ) - nTies++; - verify( remoteOrd <= ord ); - - if( i->result["veto"].trueValue() ) { - BSONElement msg = i->result["errmsg"]; - if (!msg.eoo()) { - log() << "not electing self, " << i->toHost << " would veto with '" << - msg.String() << "'" << rsLog; - } - else { - log() << "not electing self, " << i->toHost << " would veto" << rsLog; - } - return false; - } - } - else { - DEV log() << "replSet freshest returns " << i->result.toString() << rsLog; - allUp = false; - } - } - LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; - verify( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... - return true; - } - - void Consensus::_multiCommand(BSONObj cmd, list<Target>& L) { - verify( !rs.lockedByMe() ); - multiCommand(cmd, L); - } - - void Consensus::_electSelf() { - if( time(0) < steppedDown ) - return; - - { - const OpTime ord = theReplSet->lastOpTimeWritten; - if( ord == 0 ) { - log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog; - return; - } - } - - bool allUp; - int nTies; - if( !_weAreFreshest(allUp, nTies) ) { - return; - } - - rs.sethbmsg("",9); - - if (!allUp && time(0) - serverGlobalParams.started < 60 * 5) { - /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data - if we don't have to -- we'd rather be offline and wait a little longer instead - todo: make this configurable. - */ - rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes"); - return; - } - - Member& me = *rs._self; - - if( nTies ) { - /* tie? we then randomly sleep to try to not collide on our voting. */ - /* todo: smarter. */ - if( me.id() == 0 || _sleptLast ) { - // would be fine for one node not to sleep - // todo: biggest / highest priority nodes should be the ones that get to not sleep - } - else { - verify( !rs.lockedByMe() ); // bad to go to sleep locked - unsigned ms = ((unsigned) rand()) % 1000 + 50; - DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog; - _sleptLast = true; - sleepmillis(ms); - throw RetryAfterSleepException(); - } - } - _sleptLast = false; - - time_t start = time(0); - unsigned meid = me.id(); - int tally = _yea( meid ); - bool success = false; - try { - log() << "replSet info electSelf " << meid << rsLog; - - BSONObj electCmd = BSON( - "replSetElect" << 1 << - "set" << rs.name() << - "who" << me.fullName() << - "whoid" << me.hbinfo().id() << - "cfgver" << rs._cfg->version << - "round" << OID::gen() /* this is just for diagnostics */ - ); - - int configVersion; - list<Target> L; - rs.getTargets(L, configVersion); - _multiCommand(electCmd, L); - - { - for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { - LOG(1) << "replSet elect res: " << i->result.toString() << rsLog; - if( i->ok ) { - int v = i->result["vote"].Int(); - tally += v; - } - } - if( tally*2 <= _totalVotes() ) { - log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog; - } - else if( time(0) - start > 30 ) { - // defensive; should never happen as we have timeouts on connection and operation for our conn - log() << "replSet too much time passed during our election, ignoring result" << rsLog; - } - else if( configVersion != rs.config().version ) { - log() << "replSet config version changed during our election, ignoring result" << rsLog; - } - else { - /* succeeded. */ - LOG(1) << "replSet election succeeded, assuming primary role" << rsLog; - success = true; - - setElectionTime(getNextGlobalOptime()); - - rs._assumePrimary(); - } - } - } - catch( std::exception& ) { - if( !success ) _electionFailed(meid); - throw; - } - if( !success ) _electionFailed(meid); - } - - void Consensus::electSelf() { - verify( !rs.lockedByMe() ); - verify( !rs.myConfig().arbiterOnly ); - verify( rs.myConfig().slaveDelay == 0 ); - try { - _electSelf(); - } - catch(RetryAfterSleepException&) { - throw; - } - catch(VoteException& ) { - log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog; - } - catch(DBException& e) { - log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog; - } - catch(...) { - log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; - } - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/consensus.h b/src/mongo/db/repl/consensus.h deleted file mode 100644 index 37c194c1d7d..00000000000 --- a/src/mongo/db/repl/consensus.h +++ /dev/null @@ -1,95 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include <ctime> -#include <list> - -#include "mongo/util/concurrency/mutex.h" -#include "mongo/bson/oid.h" -#include "mongo/bson/optime.h" -#include "mongo/bson/bsonobj.h" - -namespace mongo { -namespace repl { - - class ReplSetImpl; - struct Target; - - class Consensus { - private: - ReplSetImpl &rs; - struct LastYea { - LastYea() : when(0), who(0xffffffff) { } - time_t when; - unsigned who; - }; - static SimpleMutex lyMutex; - LastYea _ly; - unsigned _yea(unsigned memberId); // throws VoteException - void _electionFailed(unsigned meid); - void _electSelf(); - bool _weAreFreshest(bool& allUp, int& nTies); - bool _sleptLast; // slept last elect() pass - - // This is a unique id that is changed each time we transition to PRIMARY, as the - // result of an election. - OID _electionId; - // PRIMARY server's time when the election to primary occurred - OpTime _electionTime; - - int _totalVotes() const; - void _multiCommand(BSONObj cmd, std::list<Target>& L); - public: - Consensus(ReplSetImpl *t) : rs(*t) { - _sleptLast = false; - steppedDown = 0; - } - - /* if we've stepped down, this is when we are allowed to try to elect ourself again. - todo: handle possible weirdnesses at clock skews etc. - */ - time_t steppedDown; - - bool aMajoritySeemsToBeUp() const; - bool shouldRelinquish() const; - void electSelf(); - void electCmdReceived(const StringData& set, - unsigned whoid, - int cfgver, - const OID& round, - BSONObjBuilder* result); - - OID getElectionId() const { return _electionId; } - void setElectionId(OID oid) { _electionId = oid; } - OpTime getElectionTime() const { return _electionTime; } - void setElectionTime(OpTime electionTime) { _electionTime = electionTime; } - }; -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp deleted file mode 100644 index 68ff1f03a7e..00000000000 --- a/src/mongo/db/repl/health.cpp +++ /dev/null @@ -1,466 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful,b -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/health.h" - -#include "mongo/client/connpool.h" -#include "mongo/db/commands.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/member.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplogreader.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/replset_commands.h" -#include "mongo/util/background.h" -#include "mongo/util/concurrency/task.h" -#include "mongo/util/goodies.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/html.h" - -namespace mongo { -namespace repl { - /* decls for connections.h */ - ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); - mutex ScopedConn::mapMutex("ScopedConn::mapMutex"); - - using namespace html; - - string ago(time_t t) { - if( t == 0 ) return ""; - - time_t x = time(0) - t; - stringstream s; - if( x < 180 ) { - s << x << " sec"; - if( x != 1 ) s << 's'; - } - else if( x < 3600 ) { - s.precision(2); - s << x / 60.0 << " mins"; - } - else { - s.precision(2); - s << x / 3600.0 << " hrs"; - } - return s.str(); - } - - string ReplSetImpl::stateAsHtml(MemberState s) { - if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP"); - if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY"); - if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY"); - if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING"); - if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2"); - if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER"); - if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN"); - if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK"); - return ""; - } - - // oplogdiags in web ui - static void say(stringstream&ss, const bo& op) { - ss << "<tr>"; - - set<string> skip; - be e = op["ts"]; - if( e.type() == Date || e.type() == Timestamp ) { - OpTime ot = e._opTime(); - ss << td( time_t_to_String_short( ot.getSecs() ) ); - ss << td( ot.toString() ); - skip.insert("ts"); - } - else ss << td("?") << td("?"); - - e = op["h"]; - if( e.type() == NumberLong ) { - ss << "<td>" << hex << e.Long() << "</td>\n"; - skip.insert("h"); - } - else - ss << td("?"); - - ss << td(op["op"].valuestrsafe()); - ss << td(op["ns"].valuestrsafe()); - skip.insert("op"); - skip.insert("ns"); - - ss << "<td>"; - for( bo::iterator i(op); i.more(); ) { - be e = i.next(); - if( skip.count(e.fieldName()) ) continue; - ss << e.toString() << ' '; - } - ss << "</td></tr>\n"; - } - - void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { - const Member *m = findById(server_id); - if( m == 0 ) { - ss << "Error : can't find a member with id: " << server_id << '\n'; - return; - } - - ss << p("Server : " + m->fullName() + "<br>ns : " + rsoplog ); - - //const bo fields = BSON( "o" << false << "o2" << false ); - const bo fields; - - /** todo fix we might want an so timeout here */ - OplogReader reader; - - if (reader.connect(m->h()) == false) { - ss << "couldn't connect to " << m->fullName(); - return; - } - - reader.query(rsoplog, Query().sort("$natural",1), 20, 0, &fields); - if ( !reader.haveCursor() ) { - ss << "couldn't query " << rsoplog; - return; - } - static const char *h[] = {"ts","optime","h","op","ns","rest",0}; - - ss << "<style type=\"text/css\" media=\"screen\">" - "table { font-size:75% }\n" - // "th { background-color:#bbb; color:#000 }\n" - // "td,th { padding:.25em }\n" - "</style>\n"; - - ss << table(h, true); - //ss << "<pre>\n"; - int n = 0; - OpTime otFirst; - OpTime otLast; - OpTime otEnd; - while( reader.more() ) { - bo o = reader.next(); - otLast = o["ts"]._opTime(); - if( otFirst.isNull() ) - otFirst = otLast; - say(ss, o); - n++; - } - if( n == 0 ) { - ss << rsoplog << " is empty\n"; - } - else { - reader.query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields); - if( !reader.haveCursor() ) { - ss << "couldn't query [2] " << rsoplog; - return; - } - string x; - bo o = reader.next(); - otEnd = o["ts"]._opTime(); - while( 1 ) { - stringstream z; - if( o["ts"]._opTime() == otLast ) - break; - say(z, o); - x = z.str() + x; - if( !reader.more() ) - break; - o = reader.next(); - } - if( !x.empty() ) { - ss << "<tr><td>...</td><td>...</td><td>...</td><td>...</td><td>...</td></tr>\n" << x; - //ss << "\n...\n\n" << x; - } - } - ss << _table(); - ss << p(time_t_to_String_short(time(0)) + " current time"); - - if( !otEnd.isNull() ) { - ss << "<p>Log length in time: "; - unsigned d = otEnd.getSecs() - otFirst.getSecs(); - double h = d / 3600.0; - ss.precision(3); - if( h < 72 ) - ss << h << " hours"; - else - ss << h / 24.0 << " days"; - ss << "</p>\n"; - } - } - - void ReplSetImpl::_summarizeAsHtml(OperationContext* txn, stringstream& s) const { - s << table(0, false); - s << tr("Set name:", _name); - s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" ); - - // lag - const Member *primary = box.getPrimary(); - if (primary != 0 && primary != _self && !iAmArbiterOnly() && !lastOpTimeWritten.isNull()) { - int lag = primary->hbinfo().opTime.getSecs() - lastOpTimeWritten.getSecs(); - s << tr("Lag: ", str::stream() << lag << " secs"); - } - - s << _table(); - - const char *h[] = {"Member", - "<a title=\"member id in the replset config\">id</a>", - "Up", - "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>", - "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>", - "Votes", "Priority", "State", "Messages", - "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>", - "<a title=\"Not replication lag. Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign.\">clock skew</a>", - 0 - }; - s << table(h); - - /* this is to sort the member rows by their ordinal _id, so they show up in the same - order on all the different web ui's; that is less confusing for the operator. */ - map<int,string> mp; - - string myMinValid; - try { - readlocktry lk(txn->lockState(), /*"local.replset.minvalid", */300); - if( lk.got() ) { - BSONObj mv; - if( Helpers::getSingleton(txn, "local.replset.minvalid", mv) ) { - myMinValid = "minvalid:" + mv["ts"]._opTime().toString(); - } - } - else myMinValid = "."; - } - catch(...) { - myMinValid = "exception fetching minvalid"; - } - - const Member *_self = this->_self; - verify(_self); - { - stringstream s; - /* self row */ - s << tr() << td(_self->fullName() + " (me)") << - td(_self->id()) << - td("1") << //up - td(ago(serverGlobalParams.started)) << - td("") << // last heartbeat - td(ToString(_self->config().votes)) << - td(ToString(_self->config().priority)) << - td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") ); - s << td( _hbmsg ); - stringstream q; - q << "/_replSetOplog?_id=" << _self->id(); - s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) ); - s << td(""); // skew - s << _tr(); - mp[_self->hbinfo().id()] = s.str(); - } - Member *m = head(); - while( m ) { - stringstream s; - m->summarizeMember(s); - mp[m->hbinfo().id()] = s.str(); - m = m->next(); - } - - for( map<int,string>::const_iterator i = mp.begin(); i != mp.end(); i++ ) - s << i->second; - s << _table(); - } - - const Member* ReplSetImpl::findById(unsigned id) const { - if( _self && id == _self->id() ) return _self; - - for( Member *m = head(); m; m = m->next() ) - if( m->id() == id ) - return m; - return 0; - } - - Member* ReplSetImpl::getMutableMember(unsigned id) { - if( _self && id == _self->id() ) return _self; - - for( Member *m = head(); m; m = m->next() ) - if( m->id() == id ) - return m; - return 0; - } - - Member* ReplSetImpl::findByName(const std::string& hostname) const { - if (_self && hostname == _self->fullName()) { - return _self; - } - - for (Member *m = head(); m; m = m->next()) { - if (m->fullName() == hostname) { - return m; - } - } - - return NULL; - } - - const OpTime ReplSetImpl::lastOtherOpTime() const { - OpTime closest(0,0); - - for( Member *m = _members.head(); m; m=m->next() ) { - if (!m->hbinfo().up()) { - continue; - } - - if (m->hbinfo().opTime > closest) { - closest = m->hbinfo().opTime; - } - } - - return closest; - } - - const OpTime ReplSetImpl::lastOtherElectableOpTime() const { - OpTime closest(0,0); - - for( Member *m = _members.head(); m; m=m->next() ) { - if (!m->hbinfo().up()) { - continue; - } - - if (m->hbinfo().opTime > closest && m->config().potentiallyHot()) { - log() << m->fullName() << " is now closest at " - << m->hbinfo().opTime << endl; - closest = m->hbinfo().opTime; - } - } - - return closest; - } - - void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const { - vector<BSONObj> v; - - const Member *_self = this->_self; - verify( _self ); - - MemberState myState = box.getState(); - const HostAndPort syncTarget = BackgroundSync::get()->getSyncTarget(); - - // add self - { - BSONObjBuilder bb; - bb.append("_id", (int) _self->id()); - bb.append("name", _self->fullName()); - bb.append("health", 1.0); - bb.append("state", (int)myState.s); - bb.append("stateStr", myState.toString()); - bb.append("uptime", (unsigned)(time(0) - serverGlobalParams.started)); - if (!_self->config().arbiterOnly) { - bb.appendTimestamp("optime", lastOpTimeWritten.asDate()); - bb.appendDate("optimeDate", lastOpTimeWritten.getSecs() * 1000LL); - } - - int maintenance = _maintenanceMode; - if (maintenance) { - bb.append("maintenanceMode", maintenance); - } - - if ( !syncTarget.empty() && - (myState != MemberState::RS_PRIMARY) && - (myState != MemberState::RS_REMOVED) ) { - bb.append("syncingTo", syncTarget.toString()); - } - - if (theReplSet) { - string s = theReplSet->hbmsg(); - if( !s.empty() ) - bb.append("infoMessage", s); - - if (myState == MemberState::RS_PRIMARY) { - bb.appendTimestamp("electionTime", theReplSet->getElectionTime().asDate()); - bb.appendDate("electionDate", theReplSet->getElectionTime().getSecs() * 1000LL); - } - } - bb.append("self", true); - v.push_back(bb.obj()); - } - - Member *m =_members.head(); - while( m ) { - BSONObjBuilder bb; - bb.append("_id", (int) m->id()); - bb.append("name", m->fullName()); - double h = m->hbinfo().health; - bb.append("health", h); - bb.append("state", (int) m->state().s); - if( h == 0 ) { - // if we can't connect the state info is from the past and could be confusing to show - bb.append("stateStr", "(not reachable/healthy)"); - } - else { - bb.append("stateStr", m->state().toString()); - } - bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0)); - if (!m->config().arbiterOnly) { - bb.appendTimestamp("optime", m->hbinfo().opTime.asDate()); - bb.appendDate("optimeDate", m->hbinfo().opTime.getSecs() * 1000LL); - } - bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat); - bb.appendTimeT("lastHeartbeatRecv", m->hbinfo().lastHeartbeatRecv); - bb.append("pingMs", m->hbinfo().ping); - string s = m->lhb(); - if( !s.empty() ) - bb.append("lastHeartbeatMessage", s); - - if (m->hbinfo().authIssue) { - bb.append("authenticated", false); - } - - string syncingTo = m->hbinfo().syncingTo; - if (!syncingTo.empty()) { - bb.append("syncingTo", syncingTo); - } - - if (m->state() == MemberState::RS_PRIMARY) { - bb.appendTimestamp("electionTime", m->hbinfo().electionTime.asDate()); - bb.appendDate("electionDate", m->hbinfo().electionTime.getSecs() * 1000LL); - } - - v.push_back(bb.obj()); - m = m->next(); - } - sort(v.begin(), v.end()); - b.append("set", name()); - b.appendTimeT("date", time(0)); - b.append("myState", myState.s); - if ( !syncTarget.empty() && - (myState != MemberState::RS_PRIMARY) && - (myState != MemberState::RS_REMOVED) ) { - b.append("syncingTo", syncTarget.toString()); - } - b.append("members", v); - } -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/health.h b/src/mongo/db/repl/health.h deleted file mode 100644 index bcd69ed5ea4..00000000000 --- a/src/mongo/db/repl/health.h +++ /dev/null @@ -1,41 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include <ctime> -#include <string> - -namespace mongo { -namespace repl { - - // helper function needed by member.cpp - std::string ago(time_t t); - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp deleted file mode 100644 index bf2b6ac8865..00000000000 --- a/src/mongo/db/repl/heartbeat.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful,b -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include <boost/thread/thread.hpp> - -#include "mongo/bson/util/bson_extract.h" -#include "mongo/db/commands.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/global_environment_experiment.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/heartbeat_info.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/repl_set_health_poll_task.h" -#include "mongo/db/repl/repl_set_heartbeat_args.h" -#include "mongo/db/repl/repl_set_heartbeat_response.h" -#include "mongo/db/repl/replset_commands.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rs_sync.h" -#include "mongo/db/repl/server.h" -#include "mongo/db/storage/storage_engine.h" -#include "mongo/util/background.h" -#include "mongo/util/concurrency/task.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/goodies.h" -#include "mongo/util/log.h" -#include "mongo/util/ramlog.h" - -namespace mongo { -namespace repl { - - MONGO_FP_DECLARE(rsStopHeartbeatRequest); - - bool requestHeartbeat(const std::string& setName, - const std::string& from, - const std::string& memberFullName, - BSONObj& result, - int myCfgVersion, - int& theirCfgVersion, - bool checkEmpty) { - MONGO_FAIL_POINT_BLOCK(rsStopHeartbeatRequest, member) { - const BSONObj& data = member.getData(); - const std::string& stopMember = data["member"].str(); - - if (memberFullName == stopMember) { - return false; - } - } - int me = -1; - if (theReplSet) { - me = theReplSet->selfId(); - } - - BSONObjBuilder cmdBuilder; - cmdBuilder.append("replSetHeartbeat", setName); - cmdBuilder.append("v", myCfgVersion); - cmdBuilder.append("pv", 1); - cmdBuilder.append("checkEmpty", checkEmpty); - cmdBuilder.append("from", from); - if (me > -1) { - cmdBuilder.append("fromId", me); - } - - ScopedConn conn(memberFullName); - return conn.runCommand("admin", cmdBuilder.done(), result, 0); - } - - void ReplSetImpl::endOldHealthTasks() { - unsigned sz = healthTasks.size(); - for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ ) - (*i)->halt(); - healthTasks.clear(); - if( sz ) - DEV log() << "replSet debug: cleared old tasks " << sz << endl; - } - - void ReplSetImpl::startHealthTaskFor(Member *m) { - DEV log() << "starting rsHealthPoll for " << m->fullName() << endl; - ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo()); - healthTasks.insert(task); - task::repeat(task, 2000); - } - - /** called during repl set startup. caller expects it to return fairly quickly. - note ReplSet object is only created once we get a config - so this won't run - until the initiation. - */ - void ReplSetImpl::startThreads() { - task::fork(mgr); - mgr->send( stdx::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); - - if (myConfig().arbiterOnly) { - return; - } - - // this ensures that will have bgsync's s_instance at all points where it is needed - // so that we needn't check for its existence - BackgroundSync* sync = BackgroundSync::get(); - - boost::thread t(runSyncThread); - - boost::thread producer(stdx::bind(&BackgroundSync::producerThread, sync)); - boost::thread feedback(stdx::bind(&SyncSourceFeedback::run, - &theReplSet->syncSourceFeedback)); - - // member heartbeats are started in ReplSetImpl::initFromConfig - } - -} // namespace repl -} // namespace mongo - -/* todo: - stop bg job and delete on removefromset -*/ diff --git a/src/mongo/db/repl/heartbeat.h b/src/mongo/db/repl/heartbeat.h deleted file mode 100644 index 73527ec9b9d..00000000000 --- a/src/mongo/db/repl/heartbeat.h +++ /dev/null @@ -1,47 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include <string> - -namespace mongo { - class BSONObj; - -namespace repl { - - /* throws */ - bool requestHeartbeat(const std::string& setname, - const std::string& fromHost, - const std::string& memberFullName, - BSONObj& result, - int myConfigVersion, - int& theirConfigVersion, - bool checkEmpty = false); -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/heartbeat_info.cpp b/src/mongo/db/repl/heartbeat_info.cpp deleted file mode 100644 index acc6ed1b8bc..00000000000 --- a/src/mongo/db/repl/heartbeat_info.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful,b -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include "mongo/pch.h" - -#include "mongo/db/repl/heartbeat_info.h" - -#include "mongo/util/fail_point_service.h" - -namespace mongo { -namespace repl { - - unsigned int HeartbeatInfo::numPings; - - HeartbeatInfo::HeartbeatInfo() : - hbstate(MemberState::RS_UNKNOWN), - health(-1.0), - upSince(0), - downSince(0), - lastHeartbeat(0), - lastHeartbeatRecv(0), - skew(INT_MIN), - authIssue(false), - ping(0), - _id(0xffffffff) { - } - - HeartbeatInfo::HeartbeatInfo(int id) : - hbstate(MemberState::RS_UNKNOWN), - health(-1.0), - upSince(0), - downSince(0), - lastHeartbeat(0), - lastHeartbeatRecv(0), - skew(INT_MIN), - authIssue(false), - ping(0), - _id(id) { - } - - bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { - return health != old.health || hbstate != old.hbstate; - } - - void HeartbeatInfo::updateFromLastPoll(const HeartbeatInfo& newInfo) { - hbstate = newInfo.hbstate; - health = newInfo.health; - upSince = newInfo.upSince; - downSince = newInfo.downSince; - lastHeartbeat = newInfo.lastHeartbeat; - lastHeartbeatMsg = newInfo.lastHeartbeatMsg; - // Note: lastHeartbeatRecv is updated through CmdReplSetHeartbeat::run(). - - syncingTo = newInfo.syncingTo; - opTime = newInfo.opTime; - skew = newInfo.skew; - authIssue = newInfo.authIssue; - ping = newInfo.ping; - electionTime = newInfo.electionTime; - } - - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/heartbeat_info.h b/src/mongo/db/repl/heartbeat_info.h deleted file mode 100644 index cc0370b8dd3..00000000000 --- a/src/mongo/db/repl/heartbeat_info.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (C) 2010 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <climits> - -#include "mongo/bson/optime.h" -#include "mongo/db/repl/member_state.h" -#include "mongo/util/concurrency/value.h" - -namespace mongo { -namespace repl { - - /* this is supposed to be just basic information on a member, - and copy constructable. */ - class HeartbeatInfo { - public: - HeartbeatInfo(); - HeartbeatInfo(int id); - int id() const { return _id; } - MemberState hbstate; - double health; - time_t upSince; - long long downSince; - // This is the last time we got a response from a heartbeat request to a given member. - time_t lastHeartbeat; - // This is the last time we got a heartbeat request from a given member. - time_t lastHeartbeatRecv; - DiagStr lastHeartbeatMsg; - DiagStr syncingTo; - OpTime opTime; - int skew; - bool authIssue; - unsigned int ping; // milliseconds - static unsigned int numPings; - - // Time node was elected primary - OpTime electionTime; - - bool up() const { return health > 0; } - - /** health is set to -1 on startup. that means we haven't even checked yet. 0 means we - * checked and it failed. - */ - bool maybeUp() const { return health != 0; } - - /* true if changed in a way of interest to the repl set manager. */ - bool changed(const HeartbeatInfo& old) const; - - /** - * Updates this with the info received from the command result we got from - * the last replSetHeartbeat. - */ - void updateFromLastPoll(const HeartbeatInfo& newInfo); - private: - int _id; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index c35efafec10..d53fa12026f 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -35,7 +35,6 @@ #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replset_commands.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/repl/rslog.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/repl/manager.cpp b/src/mongo/db/repl/manager.cpp deleted file mode 100644 index 6c8a9519e70..00000000000 --- a/src/mongo/db/repl/manager.cpp +++ /dev/null @@ -1,303 +0,0 @@ -/* @file manager.cpp -*/ - -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful,b -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/isself.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/db/client.h" -#include "mongo/util/log.h" - -namespace mongo { - -namespace repl { - - /* check members OTHER THAN US to see if they think they are primary */ - const Member * Manager::findOtherPrimary(OperationContext* txn, bool& two) { - two = false; - Member *m = rs->head(); - Member *p = 0; - while( m ) { - DEV verify( m != rs->_self ); - if( m->state().primary() && m->hbinfo().up() ) { - if( p ) { - two = true; - return 0; - } - p = m; - } - m = m->next(); - } - if( p ) - noteARemoteIsPrimary(txn, p); - return p; - } - - Manager::Manager(ReplSetImpl *_rs) : - task::Server("rsMgr"), rs(_rs), busyWithElectSelf(false) { - } - - Manager::~Manager() { - /* we don't destroy the replset object we sit in; however, the destructor could have thrown on init. - the log message below is just a reminder to come back one day and review this code more, and to - make it cleaner. - */ - log() << "info: ~Manager called" << rsLog; - rs->mgr = 0; - } - - void Manager::starting() { - Client::initThread("rsMgr"); - cc().getAuthorizationSession()->grantInternalAuthorization(); - } - - void Manager::noteARemoteIsPrimary(OperationContext* txn, const Member *m) { - if( rs->box.getPrimary() == m ) - return; - rs->_self->lhb() = ""; - - // this is what actually puts arbiters into ARBITER state - if( rs->iAmArbiterOnly() ) { - rs->box.set(MemberState::RS_ARBITER, m); - return; - } - - if (rs->box.getState().primary()) { - OpTime remoteElectionTime = m->hbinfo().electionTime; - LOG(1) << "another primary seen with election time " << remoteElectionTime; - if (remoteElectionTime == OpTime()) { - // This primary didn't deliver an electionTime in its heartbeat; - // assume it's a pre-2.6 primary and always step down ourselves. - log() << "stepping down; another primary seen in replicaset"; - rs->relinquish(txn); - } - // 2.6 or greater primary. Step down whoever has the older election time. - else if (remoteElectionTime > rs->getElectionTime()) { - log() << "stepping down; another primary was elected more recently"; - rs->relinquish(txn); - } - else { - // else, stick around - log() << "another PRIMARY detected but it should step down" - " since it was elected earlier than me"; - return; - } - } - - rs->box.noteRemoteIsPrimary(m); - } - - void Manager::checkElectableSet(OperationContext* txn) { - unsigned otherOp = rs->lastOtherOpTime().getSecs(); - - // make sure the electable set is up-to-date - if (rs->elect.aMajoritySeemsToBeUp() && - rs->iAmPotentiallyHot() && - (otherOp == 0 || rs->lastOpTimeWritten.getSecs() >= otherOp - 10)) { - theReplSet->addToElectable(rs->selfId()); - } - else { - theReplSet->rmFromElectable(rs->selfId()); - } - - // check if we should ask the primary (possibly ourselves) to step down - const Member *highestPriority = theReplSet->getMostElectable(); - const Member *primary = rs->box.getPrimary(); - - if (primary && highestPriority && - highestPriority->config().priority > primary->config().priority && - // if we're stepping down to allow another member to become primary, we - // better have another member (otherOp), and it should be up-to-date - otherOp != 0 && highestPriority->hbinfo().opTime.getSecs() >= otherOp - 10) { - log() << "stepping down " << primary->fullName() << " (priority " << - primary->config().priority << "), " << highestPriority->fullName() << - " is priority " << highestPriority->config().priority << " and " << - (otherOp - highestPriority->hbinfo().opTime.getSecs()) << " seconds behind" << endl; - - if (isSelf(primary->h())) { - // replSetStepDown tries to acquire the same lock - // msgCheckNewState takes, so we can't call replSetStepDown on - // ourselves. - rs->relinquish(txn); - } - else { - BSONObj cmd = BSON( "replSetStepDown" << 1 ); - ScopedConn conn(primary->fullName()); - BSONObj result; - - try { - if (!conn.runCommand("admin", cmd, result, 0)) { - log() << "stepping down " << primary->fullName() - << " failed: " << result << endl; - } - } - catch (DBException &e) { - log() << "stepping down " << primary->fullName() << " threw exception: " - << e.toString() << endl; - } - } - } - } - - bool Manager::shouldBeRecoveringDueToAuthIssue() { - int down = 0, authIssue = 0, total = 0; - - for( Member *m = rs->head(); m; m=m->next() ) { - total++; - - // all authIssue servers will also be not up - if (!m->hbinfo().up()) { - down++; - if (m->hbinfo().authIssue) { - authIssue++; - } - } - } - - // if all nodes are down or failed auth AND at least one failed - // auth, go into recovering. If all nodes are down, stay a - // secondary. - return authIssue > 0 && down == total; - } - - void Manager::checkAuth(OperationContext* txn) { - if (shouldBeRecoveringDueToAuthIssue()) { - log() << "replset error could not reach/authenticate against any members" << endl; - - if (rs->box.getPrimary() == rs->_self) { - log() << "auth problems, relinquishing primary" << rsLog; - rs->relinquish(txn); - } - - rs->changeState(MemberState::RS_RECOVERING); - } - } - - /** called as the health threads get new results */ - void Manager::msgCheckNewState() { - { - OperationContextImpl txn; - RSBase::lock lk(rs); - - if( busyWithElectSelf ) return; - - checkElectableSet(&txn); - checkAuth(&txn); - - const Member *p = rs->box.getPrimary(); - - if (p && p->id() != rs->_self->id()) { - if (!p->hbinfo().up() || !p->hbinfo().hbstate.primary()) { - p = 0; - rs->box.setOtherPrimary(0); - } - } - - const Member *p2; - { - bool two; - p2 = findOtherPrimary(&txn, two); - if( two ) { - /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ - log() << "replSet info two primaries (transiently)" << rsLog; - return; - } - } - - if( p2 ) { - noteARemoteIsPrimary(&txn, p2); - return; - } - - /* didn't find anyone who wants to be primary */ - - if( p ) { - /* we are already primary */ - - if( p != rs->_self ) { - rs->sethbmsg("error p != rs->self in checkNewState"); - log() << "replSet " << p->fullName() << rsLog; - log() << "replSet " << rs->_self->fullName() << rsLog; - return; - } - - if( rs->elect.shouldRelinquish() ) { - log() << "can't see a majority of the set, relinquishing primary" << rsLog; - rs->relinquish(&txn); - } - - return; - } - - if( !rs->iAmPotentiallyHot() ) { // if not we never try to be primary - OCCASIONALLY log() << "replSet I don't see a primary and I can't elect myself" << endl; - return; - } - - /* no one seems to be primary. shall we try to elect ourself? */ - if( !rs->elect.aMajoritySeemsToBeUp() ) { - static time_t last; - static int n; - int ll = 0; - if( ++n > 5 ) ll++; - if( last + 60 > time(0 ) ) ll++; - LOG(ll) << "replSet can't see a majority, will not try to elect self" << rsLog; - last = time(0); - return; - } - - if( !rs->iAmElectable() ) { - return; - } - - busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one. - } - try { - rs->elect.electSelf(); - } - catch(RetryAfterSleepException&) { - /* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */ - requeue(); - } - catch(...) { - log() << "replSet error unexpected assertion in rs manager" << rsLog; - } - busyWithElectSelf = false; - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/manager.h b/src/mongo/db/repl/manager.h deleted file mode 100644 index d16fa1e980f..00000000000 --- a/src/mongo/db/repl/manager.h +++ /dev/null @@ -1,65 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/jsobj.h" -#include "mongo/db/repl/server.h" - -namespace mongo { - - class OperationContext; - -namespace repl { - class Member; - class ReplSetImpl; - - class Manager : public task::Server { - ReplSetImpl *rs; - bool busyWithElectSelf; - - /** @param two - if true two primaries were seen. this can happen transiently, in addition - * to our polling being only occasional. in this case null is returned, but the caller - * should not assume primary itself in that situation. - */ - const Member* findOtherPrimary(OperationContext* txn, bool& two); - - void noteARemoteIsPrimary(OperationContext* txn, const Member *); - void checkElectableSet(OperationContext* txn); - void checkAuth(OperationContext* txn); - virtual void starting(); - public: - Manager(ReplSetImpl *rs); - virtual ~Manager(); - void msgReceivedNewConfig(BSONObj); - void msgCheckNewState(); - bool shouldBeRecoveringDueToAuthIssue(); - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 6d41eb4e63f..3734b064c72 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -57,10 +57,11 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" // replLocalAuth() +#include "mongo/db/repl/sync.h" #include "mongo/db/server_parameters.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/repl/member.cpp b/src/mongo/db/repl/member.cpp deleted file mode 100644 index fd953519c00..00000000000 --- a/src/mongo/db/repl/member.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include "mongo/db/repl/member.h" - -#include "mongo/db/repl/rs.h" -#include "mongo/util/mongoutils/html.h" - - -namespace mongo { -namespace repl { - - using namespace html; - - void Member::summarizeMember(stringstream& s) const { - s << tr(); - { - stringstream u; - u << "http://" << h().host() << ':' << (h().port() + 1000) << "/_replSet"; - s << td( a(u.str(), "", fullName()) ); - } - s << td( id() ); - double h = hbinfo().health; - bool ok = h > 0; - s << td(red(str::stream() << h,h == 0)); - s << td(ago(hbinfo().upSince)); - bool never = false; - { - string h; - time_t hb = hbinfo().lastHeartbeat; - if( hb == 0 ) { - h = "never"; - never = true; - } - else h = ago(hb) + " ago"; - s << td(h); - } - s << td(config().votes); - s << td(config().priority); - { - string stateText = state().toString(); - if( _config.hidden ) - stateText += " (hidden)"; - if( ok || stateText.empty() ) - s << td(stateText); // text blank if we've never connected - else - s << td( grey(str::stream() << "(was " << state().toString() << ')', true) ); - } - s << td( grey(hbinfo().lastHeartbeatMsg,!ok) ); - stringstream q; - q << "/_replSetOplog?_id=" << id(); - s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) ); - if( hbinfo().skew > INT_MIN ) { - s << td( grey(str::stream() << hbinfo().skew,!ok) ); - } - else - s << td(""); - s << _tr(); - } - - bool Member::syncable() const { - bool buildIndexes = theReplSet ? theReplSet->buildIndexes() : true; - return hbinfo().up() && (config().buildIndexes || !buildIndexes) && state().readable(); - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/member.h b/src/mongo/db/repl/member.h deleted file mode 100644 index d845471a4a1..00000000000 --- a/src/mongo/db/repl/member.h +++ /dev/null @@ -1,81 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include <sstream> - -#include "mongo/db/repl/heartbeat_info.h" -#include "mongo/db/repl/rs_config.h" -#include "mongo/util/concurrency/list.h" - -namespace mongo { -namespace repl { - - /* member of a replica set */ - class Member : public List1<Member>::Base { - private: - ~Member(); // intentionally unimplemented as should never be called -- see List1<>::Base. - Member(const Member&); - public: - Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : - _config(*c), - _h(h), - _hbinfo(ord) { - verify(c); - if( self ) - _hbinfo.health = 1.0; - } - - - string fullName() const { return h().toString(); } - const ReplSetConfig::MemberCfg& config() const { return _config; } - ReplSetConfig::MemberCfg& configw() { return _config; } - const HeartbeatInfo& hbinfo() const { return _hbinfo; } - HeartbeatInfo& get_hbinfo() { return _hbinfo; } - string lhb() const { return _hbinfo.lastHeartbeatMsg; } - MemberState state() const { return _hbinfo.hbstate; } - const HostAndPort& h() const { return _h; } - unsigned id() const { return _hbinfo.id(); } - - // not arbiter, not priority 0 - bool potentiallyHot() const { return _config.potentiallyHot(); } - void summarizeMember(std::stringstream& s) const; - // If we could sync from this member. This doesn't tell us anything about the quality of - // this member, just if they are a possible sync target. - bool syncable() const; - - private: - friend class ReplSetImpl; - ReplSetConfig::MemberCfg _config; - const HostAndPort _h; - HeartbeatInfo _hbinfo; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/multicmd.cpp b/src/mongo/db/repl/multicmd.cpp index 815a4b6bb7b..cb0a41345f6 100644 --- a/src/mongo/db/repl/multicmd.cpp +++ b/src/mongo/db/repl/multicmd.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/multicmd.h" +#include "mongo/db/repl/scoped_conn.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/repl/multicmd.h b/src/mongo/db/repl/multicmd.h index 1705ad17580..d42dd31d92e 100644 --- a/src/mongo/db/repl/multicmd.h +++ b/src/mongo/db/repl/multicmd.h @@ -30,7 +30,9 @@ #pragma once -#include "mongo/db/repl/connections.h" +#include <list> + +#include "mongo/db/jsobj.h" #include "mongo/util/background.h" namespace mongo { diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/db/repl/network_interface_impl.cpp index 483d7a29487..d84d2148210 100644 --- a/src/mongo/db/repl/network_interface_impl.cpp +++ b/src/mongo/db/repl/network_interface_impl.cpp @@ -41,7 +41,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/connections.h" // For ScopedConn::keepOpen +#include "mongo/db/repl/scoped_conn.h" #include "mongo/platform/unordered_map.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/list.h" diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 7ee8afa4aaf..f6c0ba0da21 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -56,7 +56,6 @@ #include "mongo/db/ops/delete.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/write_concern.h" #include "mongo/db/stats/counters.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" @@ -95,7 +94,6 @@ namespace repl { localDB = NULL; localOplogMainCollection = NULL; localOplogRSCollection = NULL; - resetSlaveCache(); } // so we can fail the same way diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp index 84ba00634a0..fbd2b03d04b 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp @@ -46,11 +46,11 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/connections.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/master_slave.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/scoped_conn.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index c05f34ad5a5..b4813d6dc47 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/db/global_optime.h" +#include "mongo/db/index/index_descriptor.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/check_quorum_for_config_change.h" #include "mongo/db/repl/elect_cmd_runner.h" @@ -50,7 +51,6 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replica_set_config_checks.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp deleted file mode 100644 index b23442f1238..00000000000 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ /dev/null @@ -1,1241 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/repl_coordinator_legacy.h" - -#include <boost/thread/thread.hpp> - -#include "mongo/base/status.h" -#include "mongo/bson/optime.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/instance.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/handshake_args.h" -#include "mongo/db/repl/heartbeat.h" -#include "mongo/db/repl/is_master_response.h" -#include "mongo/db/repl/isself.h" -#include "mongo/db/repl/master_slave.h" -#include "mongo/db/repl/member.h" -#include "mongo/db/repl/oplog.h" // for newRepl() -#include "mongo/db/repl/repl_coordinator_external_state_impl.h" -#include "mongo/db/repl/repl_set_heartbeat_args.h" -#include "mongo/db/repl/repl_set_heartbeat_response.h" -#include "mongo/db/repl/repl_set_seed_list.h" -#include "mongo/db/repl/repl_settings.h" -#include "mongo/db/repl/replset_commands.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rs_config.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/db/repl/update_position_args.h" -#include "mongo/db/repl/write_concern.h" -#include "mongo/db/write_concern_options.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" - -namespace mongo { - -namespace repl { - - LegacyReplicationCoordinator::LegacyReplicationCoordinator(const ReplSettings& settings) : - _maintenanceMode(0), _settings(settings) { - // this is ok but micros or combo with some rand() and/or 64 bits might be better -- - // imagine a restart and a clock correction simultaneously (very unlikely but possible...) - _rbid = (int) curTimeMillis64(); - } - LegacyReplicationCoordinator::~LegacyReplicationCoordinator() {} - - void LegacyReplicationCoordinator::startReplication(OperationContext* txn) { - if (!isReplEnabled()) { - return; - } - - ReplicationCoordinatorExternalStateImpl externalState; - _myRID = externalState.ensureMe(txn); - - // if we are going to be a replica set, we aren't doing other forms of replication. - if (!_settings.replSet.empty()) { - if (_settings.slave || _settings.master) { - log() << "***" << endl; - log() << "ERROR: can't use --slave or --master replication options with --replSet"; - log() << "***" << endl; - } - - ReplSetSeedList *replSetSeedList = new ReplSetSeedList(&externalState, - _settings.replSet); - boost::thread t(stdx::bind(&startReplSets, replSetSeedList)); - } else { - startMasterSlave(txn); - } - } - - void LegacyReplicationCoordinator::shutdown() { - } - - ReplSettings& LegacyReplicationCoordinator::getSettings() { - return _settings; - } - - ReplicationCoordinator::Mode LegacyReplicationCoordinator::getReplicationMode() const { - if (theReplSet) { - return modeReplSet; - } else if (_settings.slave || _settings.master) { - return modeMasterSlave; - } - return modeNone; - } - - MemberState LegacyReplicationCoordinator::getCurrentMemberState() const { - invariant(getReplicationMode() == modeReplSet); - return theReplSet->state(); - } - - Seconds LegacyReplicationCoordinator::getSlaveDelaySecs() const { - invariant(getReplicationMode() == modeReplSet); - return Seconds(theReplSet->myConfig().slaveDelay); - } - - void LegacyReplicationCoordinator::clearSyncSourceBlacklist() { - theReplSet->clearVetoes(); - } - - ReplicationCoordinator::StatusAndDuration LegacyReplicationCoordinator::awaitReplication( - const OperationContext* txn, - const OpTime& ts, - const WriteConcernOptions& writeConcern) { - - Timer timeoutTimer; - - if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) { - // no desired replication check - return StatusAndDuration(Status::OK(), Milliseconds(timeoutTimer.millis())); - } - - const Mode replMode = getReplicationMode(); - if (replMode == modeNone || serverGlobalParams.configsvr) { - // no replication check needed (validated above) - return StatusAndDuration(Status::OK(), Milliseconds(timeoutTimer.millis())); - } - - if (writeConcern.wMode == "majority" && replMode == modeMasterSlave) { - // with master/slave, majority is equivalent to w=1 - return StatusAndDuration(Status::OK(), Milliseconds(timeoutTimer.millis())); - } - - if (ts.isNull()) { - // If waiting for the empty optime, always say it's been replicated. - return StatusAndDuration(Status::OK(), Milliseconds(timeoutTimer.millis())); - } - - try { - while (1) { - if (!writeConcern.wMode.empty()) { - if (opReplicatedEnough(ts, writeConcern.wMode)) { - return StatusAndDuration(Status::OK(), Milliseconds(timeoutTimer.millis())); - } - } - else if (opReplicatedEnough(ts, writeConcern.wNumNodes)) { - return StatusAndDuration(Status::OK(), Milliseconds(timeoutTimer.millis())); - } - - if (writeConcern.wTimeout > 0 && timeoutTimer.millis() >= writeConcern.wTimeout) { - return StatusAndDuration(Status(ErrorCodes::ExceededTimeLimit, - "waiting for replication timed out"), - Milliseconds(timeoutTimer.millis())); - } - - if (writeConcern.wTimeout == -1) { - return StatusAndDuration(Status(ErrorCodes::ExceededTimeLimit, - "replication not finished when checked"), - Milliseconds(timeoutTimer.millis())); - } - - // TODO (dannenberg) is this the best sleep amount? - sleepmillis(1); - txn->checkForInterrupt(); - } - - } - catch (const DBException& ex) { - return StatusAndDuration(ex.toStatus(), Milliseconds(timeoutTimer.millis())); - } - } - - ReplicationCoordinator::StatusAndDuration - LegacyReplicationCoordinator::awaitReplicationOfLastOpForClient( - const OperationContext* txn, - const WriteConcernOptions& writeConcern) { - return awaitReplication(txn, txn->getClient()->getLastOp(), writeConcern); - } - - ReplicationCoordinator::StatusAndDuration - LegacyReplicationCoordinator::awaitReplicationOfLastOpApplied( - const OperationContext* txn, - const WriteConcernOptions& writeConcern) { - return awaitReplication(txn, theReplSet->lastOpTimeWritten, writeConcern); - } - -namespace { - /** - * Waits up to timeout milliseconds for one secondary to get within threshold milliseconds - * of us. - */ - Status _waitForSecondary(const ReplicationCoordinator::Milliseconds& timeout, - const ReplicationCoordinator::Milliseconds& threshold) { - if (theReplSet->getConfig().members.size() <= 1) { - return Status(ErrorCodes::ExceededTimeLimit, - mongoutils::str::stream() << "no secondaries within " << - threshold.total_seconds() << " seconds of my optime"); - } - - long long timeoutTime, now, start; - timeoutTime = now = start = curTimeMillis64()/1000; - timeoutTime += timeout.total_seconds(); - - OpTime lastOp = repl::theReplSet->lastOpTimeWritten; - OpTime closest = repl::theReplSet->lastOtherElectableOpTime(); - long long int diff = lastOp.getSecs() - closest.getSecs(); - while (now <= timeoutTime && (diff < 0 || diff > threshold.total_seconds())) { - sleepsecs(1); - now = curTimeMillis64() / 1000; - - lastOp = repl::theReplSet->lastOpTimeWritten; - closest = repl::theReplSet->lastOtherElectableOpTime(); - diff = lastOp.getSecs() - closest.getSecs(); - } - - if (diff < 0) { - // not our problem but we'll wait until things settle down - return Status(ErrorCodes::SecondaryAheadOfPrimary, - "someone is ahead of the primary?"); - } - if (diff > threshold.total_seconds()) { - return Status(ErrorCodes::ExceededTimeLimit, - mongoutils::str::stream() << "no secondaries within " << - threshold.total_seconds() << " seconds of my optime"); - } - return Status::OK(); - } -} // namespace - - Status LegacyReplicationCoordinator::stepDown(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime) { - invariant(getReplicationMode() == modeReplSet); - if (!getCurrentMemberState().primary()) { - return Status(ErrorCodes::NotMaster, "not primary so can't step down"); - } - - if (!force) { - Status status = _waitForSecondary(waitTime, Milliseconds(10 * 1000)); - if (!status.isOK()) { - return status; - } - } - - // step down - bool worked = repl::theReplSet->stepDown(txn, stepdownTime.total_seconds()); - if (!worked) { - return Status(ErrorCodes::NotMaster, "not primary so can't step down"); - } - return Status::OK(); - } - - bool LegacyReplicationCoordinator::isMasterForReportingPurposes() { - // we must check replSet since because getReplicationMode() isn't aware of modeReplSet - // until theReplSet is initialized - if (_settings.usingReplSets()) { - if (theReplSet && getCurrentMemberState().primary()) { - return true; - } - return false; - } - - if (!_settings.slave) - return true; - - if (replAllDead) { - return false; - } - - if (_settings.master) { - // if running with --master --slave, allow. - return true; - } - - return false; - } - - bool LegacyReplicationCoordinator::canAcceptWritesForDatabase(const StringData& dbName) { - // we must check replSet since because getReplicationMode() isn't aware of modeReplSet - // until theReplSet is initialized - if (_settings.usingReplSets()) { - if (theReplSet && getCurrentMemberState().primary()) { - return true; - } - return dbName == "local"; - } - - if (!_settings.slave) - return true; - - // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed - if (replAllDead) { - return dbName == "local"; - } - - if (_settings.master) { - // if running with --master --slave, allow. - return true; - } - - return dbName == "local"; - } - - Status LegacyReplicationCoordinator::checkCanServeReadsFor(OperationContext* txn, - const NamespaceString& ns, - bool slaveOk) { - if (txn->isGod()) { - return Status::OK(); - } - if (canAcceptWritesForDatabase(ns.db())) { - return Status::OK(); - } - if (getReplicationMode() == modeMasterSlave && _settings.slave == SimpleSlave) { - return Status::OK(); - } - if (slaveOk) { - if (getReplicationMode() == modeMasterSlave || getReplicationMode() == modeNone) { - return Status::OK(); - } - if (getCurrentMemberState().secondary()) { - return Status::OK(); - } - return Status(ErrorCodes::NotMasterOrSecondaryCode, - "not master or secondary; cannot currently read from this replSet member"); - } - return Status(ErrorCodes::NotMasterNoSlaveOkCode, - "not master and slaveOk=false"); - - } - - bool LegacyReplicationCoordinator::shouldIgnoreUniqueIndex(const IndexDescriptor* idx) { - if (!idx->unique()) { - return false; - } - if (!theReplSet) { - return false; - } - // see SERVER-6671 - MemberState ms = theReplSet->state(); - if (! ((ms == MemberState::RS_STARTUP2) || - (ms == MemberState::RS_RECOVERING) || - (ms == MemberState::RS_ROLLBACK))) { - return false; - } - // Never ignore _id index - if (idx->isIdIndex()) { - return false; - } - - return true; - } - - Status LegacyReplicationCoordinator::setLastOptimeForSlave(OperationContext* txn, - const OID& rid, - const OpTime& ts) { - invariant(getReplicationMode() == modeMasterSlave); - return _setLastOptime(txn, rid, ts); - - } - - Status LegacyReplicationCoordinator::_setLastOptime(OperationContext* txn, - const OID& rid, - const OpTime& ts) { - { - boost::lock_guard<boost::mutex> lock(_mutex); - SlaveOpTimeMap::const_iterator it(_slaveOpTimeMap.find(rid)); - - if (rid != getMyRID()) { - if ((it != _slaveOpTimeMap.end()) && (ts <= it->second)) { - // Only update if ts is newer than what we have already - return Status::OK(); - } - - BSONObj config; - if (getReplicationMode() == modeReplSet) { - invariant(_ridMemberMap.count(rid)); - Member* mem = _ridMemberMap[rid]; - invariant(mem); - config = BSON("_id" << mem->id() << "host" << mem->h().toString()); - } - else if (getReplicationMode() == modeMasterSlave){ - config = BSON("host" << txn->getClient()->getRemote().toString()); - } - LOG(2) << "received notification that node with RID " << rid << " and config " - << config << " has reached optime: " << ts; - - // This is what updates the progress information used for satisfying write concern - // and wakes up threads waiting for replication. - if (!updateSlaveTracking(BSON("_id" << rid), config, ts)) { - return Status(ErrorCodes::NodeNotFound, - str::stream() << "could not update node with _id: " - << config["_id"].Int() << " and RID " << rid - << " because it cannot be found in current ReplSetConfig " - << theReplSet->getConfig().toString()); - } - } - - // This updates the _slaveOpTimeMap which is used for forwarding slave progress - // upstream in chained replication. - LOG(2) << "Updating our knowledge of the replication progress for node with RID " << - rid << " to be at optime " << ts; - _slaveOpTimeMap[rid] = ts; - } - - if (getReplicationMode() == modeReplSet && !getCurrentMemberState().primary()) { - // pass along if we are not primary - theReplSet->syncSourceFeedback.forwardSlaveProgress(); - } - return Status::OK(); - } - - Status LegacyReplicationCoordinator::setMyLastOptime(OperationContext* txn, const OpTime& ts) { - if (getReplicationMode() == modeReplSet) { - theReplSet->lastOpTimeWritten = ts; - } - return _setLastOptime(txn, _myRID, ts); - } - - void LegacyReplicationCoordinator::setMyHeartbeatMessage(const std::string& msg) { - if (getReplicationMode() == modeReplSet) { - theReplSet->sethbmsg(msg, 0); - } - } - - OpTime LegacyReplicationCoordinator::getMyLastOptime() const { - boost::lock_guard<boost::mutex> lock(_mutex); - - SlaveOpTimeMap::const_iterator it(_slaveOpTimeMap.find(_myRID)); - if (it == _slaveOpTimeMap.end()) { - return OpTime(0,0); - } - OpTime legacyMapOpTime = it->second; - OpTime legacyOpTime = theReplSet->lastOpTimeWritten; - // TODO(emilkie): SERVER-15209 - // This currently fails because a PRIMARY can see an old optime for itself - // come through the spanning tree, which updates the slavemap but not the variable. - // replsets_priority1.js is a test that hits this condition (sometimes) and fails. - //fassert(18695, legacyOpTime == legacyMapOpTime); - return legacyOpTime; - } - - OID LegacyReplicationCoordinator::getElectionId() { - return theReplSet->getElectionId(); - } - - - OID LegacyReplicationCoordinator::getMyRID() const { - return _myRID; - } - - int LegacyReplicationCoordinator::getMyId() const { - invariant(theReplSet); - return theReplSet->myConfig()._id; - } - - bool LegacyReplicationCoordinator::setFollowerMode(const MemberState& newState) { - if (newState.secondary() && - theReplSet->state().recovering() && - theReplSet->mgr->shouldBeRecoveringDueToAuthIssue()) { - // If tryToGoLiveAsSecondary is trying to take us from RECOVERING to SECONDARY, but we - // still have an authIssue, don't actually change states. - return false; - } - theReplSet->changeState(newState); - return true; - } - - bool LegacyReplicationCoordinator::isWaitingForApplierToDrain() { - return BackgroundSync::get()->isAssumingPrimary_inlock(); - } - - void LegacyReplicationCoordinator::signalDrainComplete() { - // nothing further to do - } - - void LegacyReplicationCoordinator::signalUpstreamUpdater() { - theReplSet->syncSourceFeedback.forwardSlaveHandshake(); - } - - void LegacyReplicationCoordinator::prepareReplSetUpdatePositionCommand( - OperationContext* txn, - BSONObjBuilder* cmdBuilder) { - invariant(getReplicationMode() == modeReplSet); - boost::lock_guard<boost::mutex> lock(_mutex); - cmdBuilder->append("replSetUpdatePosition", 1); - // create an array containing objects each member connected to us and for ourself - BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); - OID myID = getMyRID(); - { - for (SlaveOpTimeMap::const_iterator itr = _slaveOpTimeMap.begin(); - itr != _slaveOpTimeMap.end(); ++itr) { - const OID& rid = itr->first; - BSONObjBuilder entry(arrayBuilder.subobjStart()); - entry.append("_id", rid); - entry.append("optime", itr->second); - // SERVER-14550 Even though the "config" field isn't used on the other end in 2.8, - // we need to keep sending it for 2.6 compatibility. - // TODO(spencer): Remove this after 2.8 is released. - if (rid == myID) { - entry.append("config", theReplSet->myConfig().asBson()); - } - else { - Member* member = _ridMemberMap[rid]; - invariant(member); - BSONObj config = member->config().asBson(); - entry.append("config", config); - } - } - } - } - - void LegacyReplicationCoordinator::prepareReplSetUpdatePositionCommandHandshakes( - OperationContext* txn, - std::vector<BSONObj>* handshakes) { - invariant(getReplicationMode() == modeReplSet); - boost::lock_guard<boost::mutex> lock(_mutex); - // handshake obj for us - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - BSONObjBuilder sub (cmd.subobjStart("handshake")); - sub.append("handshake", getMyRID()); - sub.append("member", theReplSet->selfId()); - sub.append("config", theReplSet->myConfig().asBson()); - sub.doneFast(); - handshakes->push_back(cmd.obj()); - - // handshake objs for all chained members - for (OIDMemberMap::const_iterator itr = _ridMemberMap.begin(); - itr != _ridMemberMap.end(); ++itr) { - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - // outer handshake indicates this is a handshake command - // inner is needed as part of the structure to be passed to gotHandshake - BSONObjBuilder subCmd (cmd.subobjStart("handshake")); - subCmd.append("handshake", itr->first); - subCmd.append("member", itr->second->id()); - subCmd.append("config", itr->second->config().asBson()); - subCmd.doneFast(); - handshakes->push_back(cmd.obj()); - } - } - - Status LegacyReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) { - theReplSet->summarizeStatus(*result); - // NOTE: The following field is for debugging only and not part of the interface. - result->append("replCoord", "legacy"); - return Status::OK(); - } - - void LegacyReplicationCoordinator::fillIsMasterForReplSet(IsMasterResponse* result) { - invariant(getSettings().usingReplSets()); - if (getReplicationMode() != ReplicationCoordinator::modeReplSet - || getCurrentMemberState().removed()) { - result->markAsNoConfig(); - } - else { - BSONObjBuilder resultBuilder; - theReplSet->fillIsMaster(resultBuilder); - Status status = result->initialize(resultBuilder.done()); - fassert(18821, status); - } - } - - void LegacyReplicationCoordinator::appendSlaveInfoData(BSONObjBuilder* result) { - boost::lock_guard<boost::mutex> lock(_mutex); - BSONArrayBuilder slaves(result->subarrayStart("slaves")); - { - for (SlaveOpTimeMap::const_iterator itr = _slaveOpTimeMap.begin(); - itr != _slaveOpTimeMap.end(); ++itr) { - BSONObjBuilder entry(slaves.subobjStart()); - entry.append("rid", itr->first); - entry.append("optime", itr->second); - if (getReplicationMode() == modeReplSet) { - Member* member = _ridMemberMap[itr->first]; - invariant(member); - entry.append("host", member->h().toString()); - entry.append("memberID", member->id()); - } - } - } - } - - void LegacyReplicationCoordinator::processReplSetGetConfig(BSONObjBuilder* result) { - result->append("config", theReplSet->config().asBson()); - } - - bool LegacyReplicationCoordinator::_setMaintenanceMode_inlock(OperationContext* txn, - bool activate) { - if (theReplSet->state().primary()) { - return false; - } - - if (activate) { - log() << "replSet going into maintenance mode (" << _maintenanceMode - << " other tasks)" << rsLog; - - _maintenanceMode++; - theReplSet->changeState(MemberState::RS_RECOVERING); - } - else if (_maintenanceMode > 0) { - _maintenanceMode--; - // no need to change state, syncTail will try to go live as a secondary soon - - log() << "leaving maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; - } - else { - return false; - } - - fassert(16844, _maintenanceMode >= 0); - return true; - } - - Status LegacyReplicationCoordinator::setMaintenanceMode(OperationContext* txn, bool activate) { - // Lock here to prevent state from changing between checking the state and changing it - Lock::GlobalWrite writeLock(txn->lockState()); - boost::lock_guard<boost::mutex> lock(_mutex); - - if (!_setMaintenanceMode_inlock(txn, activate)) { - if (theReplSet->isPrimary()) { - return Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode"); - } - else { - return Status(ErrorCodes::OperationFailed, "already out of maintenance mode"); - } - } - return Status::OK(); - } - - bool LegacyReplicationCoordinator::getMaintenanceMode() { - boost::lock_guard<boost::mutex> lock(_mutex); - return _maintenanceMode > 0; - } - - Status LegacyReplicationCoordinator::processHeartbeat(const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response) { - if (args.getProtocolVersion() != 1) { - return Status(ErrorCodes::BadValue, "incompatible replset protocol version"); - } - - { - if (_settings.ourSetName() != args.getSetName()) { - log() << "replSet set names do not match, our cmdline: " << _settings.replSet - << rsLog; - log() << "replSet s: " << args.getSetName() << rsLog; - response->noteMismatched(); - return Status(ErrorCodes::InconsistentReplicaSetNames, - "repl set names do not match"); - } - } - - response->noteReplSet(); - if( (theReplSet == 0) || (theReplSet->startupStatus == ReplSetImpl::LOADINGCONFIG) ) { - if (!args.getSenderHost().empty()) { - scoped_lock lck( _settings.discoveredSeeds_mx ); - _settings.discoveredSeeds.insert(args.getSenderHost().toString()); - } - response->setHbMsg("still initializing"); - return Status::OK(); - } - - if (theReplSet->name() != args.getSetName()) { - response->noteMismatched(); - return Status(ErrorCodes::InconsistentReplicaSetNames, - "repl set names do not match (2)"); - } - response->setSetName(theReplSet->name()); - - MemberState currentState = theReplSet->state(); - response->setState(currentState.s); - if (currentState == MemberState::RS_PRIMARY) { - response->setElectionTime(theReplSet->getElectionTime().asDate()); - } - - response->setElectable(theReplSet->iAmElectable()); - response->setHbMsg(theReplSet->hbmsg()); - response->setTime(Seconds(time(0))); - response->setOpTime(theReplSet->lastOpTimeWritten.asDate()); - const HostAndPort syncTarget = BackgroundSync::get()->getSyncTarget(); - if (!syncTarget.empty()) { - response->setSyncingTo(syncTarget.toString()); - } - - int v = theReplSet->config().version; - response->setVersion(v); - if (v > args.getConfigVersion()) { - ReplicaSetConfig config; - fassert(18635, config.initialize(theReplSet->config().asBson())); - response->setConfig(config); - } - - Member* from = NULL; - if (v == args.getConfigVersion() && args.getSenderId() != -1) { - from = theReplSet->getMutableMember(args.getSenderId()); - } - if (!from) { - from = theReplSet->findByName(args.getSenderHost().toString()); - if (!from) { - return Status::OK(); - } - } - - // if we thought that this node is down, let it know - if (!from->hbinfo().up()) { - response->noteStateDisagreement(); - } - - // note that we got a heartbeat from this node - theReplSet->mgr->send(stdx::bind(&ReplSet::msgUpdateHBRecv, - theReplSet, from->hbinfo().id(), time(0))); - - - return Status::OK(); - } - - Status LegacyReplicationCoordinator::checkReplEnabledForCommand(BSONObjBuilder* result) { - if (!_settings.usingReplSets()) { - if (serverGlobalParams.configsvr) { - result->append("info", "configsvr"); // for shell prompt - } - return Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet"); - } - - if( theReplSet == 0 ) { - result->append("startupStatus", ReplSet::startupStatus); - if( ReplSet::startupStatus == ReplSet::EMPTYCONFIG ) - result->append("info", "run rs.initiate(...) if not yet done for the set"); - return Status(ErrorCodes::NotYetInitialized, ReplSet::startupStatusMsg.empty() ? - "replset unknown error 2" : ReplSet::startupStatusMsg.get()); - } - - return Status::OK(); - } - -namespace { - void _checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial) { - int failures = 0, allVotes = 0, allowableFailures = 0; - int me = 0; - stringstream selfs; - for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { - if (isSelf(i->h)) { - me++; - if( me > 1 ) - selfs << ','; - selfs << i->h.toString(); - if( !i->potentiallyHot() ) { - uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary"); - } - } - allVotes += i->votes; - } - allowableFailures = allVotes - (allVotes/2 + 1); - - uassert(13278, "bad config: isSelf is true for multiple hosts: " + selfs.str(), me <= 1); // dups? - if( me != 1 ) { - stringstream ss; - ss << "can't find self in the replset config"; - if (!serverGlobalParams.isDefaultPort()) ss << " my port: " << serverGlobalParams.port; - if( me != 0 ) ss << " found: " << me; - uasserted(13279, ss.str()); - } - - vector<string> down; - for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { - // we know we're up - if (isSelf(i->h)) { - continue; - } - - BSONObj res; - { - bool ok = false; - try { - int theirVersion = -1000; - ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/); - if( theirVersion >= cfg.version ) { - stringstream ss; - ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure"; - uasserted(13259, ss.str()); - } - } - catch(DBException& e) { - log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog; - } - catch(...) { - log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog; - } - if( res.getBoolField("mismatch") ) - uasserted(13145, "set name does not match the set name host " + i->h.toString() + " expects"); - if( *res.getStringField("set") ) { - if( cfg.version <= 1 ) { - // this was to be initiation, no one should be initiated already. - uasserted(13256, "member " + i->h.toString() + " is already initiated"); - } - else { - // Assure no one has a newer config. - if( res["v"].Int() >= cfg.version ) { - uasserted(13341, "member " + i->h.toString() + " has a config version >= to the new cfg version; cannot change config"); - } - } - } - if( !ok && !res["rs"].trueValue() ) { - down.push_back(i->h.toString()); - - if( !res.isEmpty() ) { - /* strange. got a response, but not "ok". log it. */ - log() << "replSet warning " << i->h.toString() << " replied: " << res.toString() << rsLog; - } - - bool allowFailure = false; - failures += i->votes; - if( !initial && failures <= allowableFailures ) { - const Member* m = theReplSet->findById( i->_id ); - if( m ) { - verify( m->h().toString() == i->h.toString() ); - } - // it's okay if the down member isn't part of the config, - // we might be adding a new member that isn't up yet - allowFailure = true; - } - - if( !allowFailure ) { - string msg = string("need all members up to initiate, not ok : ") + i->h.toString(); - if( !initial ) - msg = string("need most members up to reconfigure, not ok : ") + i->h.toString(); - uasserted(13144, msg); - } - } - } - if( initial ) { - bool hasData = res["hasData"].Bool(); - uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.", - !hasData || isSelf(i->h)); - } - } - if (down.size() > 0) { - result.append("down", down); - } - } -} // namespace - Status LegacyReplicationCoordinator::processReplSetReconfig(OperationContext* txn, - const ReplSetReconfigArgs& args, - BSONObjBuilder* resultObj) { - - if( !args.force && !theReplSet->box.getState().primary() ) { - return Status(ErrorCodes::NotMaster, - "replSetReconfig command must be sent to the current replica set " - "primary."); - } - - try { - { - // just make sure we can get a write lock before doing anything else. we'll - // reacquire one later. of course it could be stuck then, but this check lowers the - // risk if weird things are up - we probably don't want a change to apply 30 minutes - // after the initial attempt. - time_t t = time(0); - Lock::GlobalWrite lk(txn->lockState()); - if( time(0)-t > 20 ) { - return Status(ErrorCodes::ExceededTimeLimit, - "took a long time to get write lock, so not initiating. " - "Initiate when server less busy?"); - } - } - - - scoped_ptr<ReplSetConfig> newConfig(ReplSetConfig::make(txn, args.newConfigObj, args.force)); - - log() << "replSet replSetReconfig config object parses ok, " << - newConfig->members.size() << " members specified" << rsLog; - - Status status = ReplSetConfig::legalChange(theReplSet->getConfig(), *newConfig); - if (!status.isOK()) { - return status; - } - - _checkMembersUpForConfigChange(*newConfig, *resultObj, false); - - log() << "replSet replSetReconfig [2]" << rsLog; - - theReplSet->haveNewConfig(txn, *newConfig, false); - ReplSet::startupStatusMsg.set("replSetReconfig'd"); - } - catch(const DBException& e) { - log() << "replSet replSetReconfig exception: " << e.what() << rsLog; - return e.toStatus(); - } - - resetSlaveCache(); - return Status::OK(); - } - - Status LegacyReplicationCoordinator::processReplSetInitiate(OperationContext* txn, - const BSONObj& configObj, - BSONObjBuilder* resultObj) { - - log() << "replSet replSetInitiate admin command received from client" << rsLog; - - if (!_settings.usingReplSets()) { - return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet"); - } - - if( theReplSet ) { - resultObj->append("info", - "try querying " + rsConfigNs + " to see current configuration"); - return Status(ErrorCodes::AlreadyInitialized, "already initialized"); - } - - try { - { - // just make sure we can get a write lock before doing anything else. we'll - // reacquire one later. of course it could be stuck then, but this check lowers the - // risk if weird things are up. - time_t t = time(0); - Lock::GlobalWrite lk(txn->lockState()); - if( time(0)-t > 10 ) { - return Status(ErrorCodes::ExceededTimeLimit, - "took a long time to get write lock, so not initiating. " - "Initiate when server less busy?"); - } - - /* check that we don't already have an oplog. that could cause issues. - it is ok if the initiating member has *other* data than that. - */ - BSONObj o; - if (Helpers::getSingleton(txn, rsoplog, o)) { - return Status(ErrorCodes::AlreadyInitialized, - rsoplog + string(" is not empty on the initiating member. " - "cannot initiate.")); - } - } - - if( ReplSet::startupStatus == ReplSet::BADCONFIG ) { - resultObj->append("info", ReplSet::startupStatusMsg.get()); - return Status(ErrorCodes::InvalidReplicaSetConfig, - "server already in BADCONFIG state (check logs); not initiating"); - } - if( ReplSet::startupStatus != ReplSet::EMPTYCONFIG ) { - resultObj->append("startupStatus", ReplSet::startupStatus); - resultObj->append("info", _settings.replSet); - return Status(ErrorCodes::InvalidReplicaSetConfig, - "all members and seeds must be reachable to initiate set"); - } - - scoped_ptr<ReplSetConfig> newConfig; - try { - newConfig.reset(ReplSetConfig::make(txn, configObj)); - } catch (const DBException& e) { - log() << "replSet replSetInitiate exception: " << e.what() << rsLog; - return Status(ErrorCodes::InvalidReplicaSetConfig, - mongoutils::str::stream() << "couldn't parse cfg object " << e.what()); - } - - if( newConfig->version > 1 ) { - return Status(ErrorCodes::InvalidReplicaSetConfig, - "can't initiate with a version number greater than 1"); - } - - log() << "replSet replSetInitiate config object parses ok, " << - newConfig->members.size() << " members specified" << rsLog; - - _checkMembersUpForConfigChange(*newConfig, *resultObj, true); - - log() << "replSet replSetInitiate all members seem up" << rsLog; - - Lock::GlobalWrite lk(txn->lockState()); - newConfig->saveConfigLocally(txn, BSONObj()); - log() << "replSet replSetInitiate config now saved locally. " - "Should come online in about a minute." << rsLog; - resultObj->append("info", - "Config now saved locally. Should come online in about a minute."); - ReplSet::startupStatus = ReplSet::SOON; - ReplSet::startupStatusMsg.set("Received replSetInitiate - " - "should come online shortly."); - } - catch(const DBException& e ) { - return e.toStatus(); - } - - return Status::OK(); - } - - Status LegacyReplicationCoordinator::processReplSetGetRBID(BSONObjBuilder* resultObj) { - resultObj->append("rbid", _rbid); - return Status::OK(); - } - -namespace { - bool _shouldVeto(unsigned id, std::string* errmsg) { - const Member* primary = theReplSet->box.getPrimary(); - const Member* hopeful = theReplSet->findById(id); - const Member *highestPriority = theReplSet->getMostElectable(); - - if (!hopeful) { - *errmsg = str::stream() << "replSet couldn't find member with id " << id; - return true; - } - - if (theReplSet->isPrimary() && - theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime) { - // hbinfo is not updated, so we have to check the primary's last optime separately - *errmsg = str::stream() << "I am already primary, " << hopeful->fullName() << - " can try again once I've stepped down"; - return true; - } - - if (primary && - (hopeful->hbinfo().id() != primary->hbinfo().id()) && - (primary->hbinfo().opTime >= hopeful->hbinfo().opTime)) { - // other members might be aware of more up-to-date nodes - *errmsg = str::stream() << hopeful->fullName() << - " is trying to elect itself but " << primary->fullName() << - " is already primary and more up-to-date"; - return true; - } - - if (highestPriority && - highestPriority->config().priority > hopeful->config().priority) { - *errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << - highestPriority->fullName(); - return true; - } - - if (!theReplSet->isElectable(id)) { - *errmsg = str::stream() << "I don't think " << hopeful->fullName() << - " is electable"; - return true; - } - - return false; - } -} // namespace - - Status LegacyReplicationCoordinator::processReplSetFresh(const ReplSetFreshArgs& args, - BSONObjBuilder* resultObj){ - if( args.setName != theReplSet->name() ) { - return Status(ErrorCodes::ReplicaSetNotFound, - str::stream() << "wrong repl set name. Expected: " << - theReplSet->name() << ", received: " << args.setName); - } - - bool weAreFresher = false; - if( theReplSet->config().version > args.cfgver ) { - log() << "replSet member " << args.who << " is not yet aware its cfg version " << - args.cfgver << " is stale" << rsLog; - resultObj->append("info", "config version stale"); - weAreFresher = true; - } - // check not only our own optime, but any other member we can reach - else if( args.opTime < theReplSet->lastOpTimeWritten || - args.opTime < theReplSet->lastOtherOpTime()) { - weAreFresher = true; - } - resultObj->appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); - resultObj->append("fresher", weAreFresher); - - std::string errmsg; - bool veto = _shouldVeto(args.id, &errmsg); - resultObj->append("veto", veto); - if (veto) { - resultObj->append("errmsg", errmsg); - } - - return Status::OK(); - } - - Status LegacyReplicationCoordinator::processReplSetElect(const ReplSetElectArgs& args, - BSONObjBuilder* resultObj) { - theReplSet->electCmdReceived(args.set, args.whoid, args.cfgver, args.round, resultObj); - return Status::OK(); - } - - void LegacyReplicationCoordinator::incrementRollbackID() { - ++_rbid; - } - - Status LegacyReplicationCoordinator::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { - if (theReplSet->freeze(secs)) { - if (secs == 0) { - resultObj->append("info","unfreezing"); - } - } - if (secs == 1) { - resultObj->append("warning", "you really want to freeze for only 1 second?"); - } - return Status::OK(); - } - - Status LegacyReplicationCoordinator::processReplSetSyncFrom(const HostAndPort& target, - BSONObjBuilder* resultObj) { - resultObj->append("syncFromRequested", target.toString()); - - return theReplSet->forceSyncFrom(target.toString(), resultObj); - } - - Status LegacyReplicationCoordinator::processReplSetUpdatePosition( - OperationContext* txn, - const UpdatePositionArgs& updates) { - - for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin(); - update != updates.updatesEnd(); - ++update) { - Status status = _setLastOptime(txn, update->rid, update->ts); - if (!status.isOK()) { - return status; - } - } - return Status::OK(); - } - - Status LegacyReplicationCoordinator::processHandshake(const OperationContext* txn, - const HandshakeArgs& handshake) { - LOG(2) << "Received handshake " << handshake.toBSON(); - - boost::lock_guard<boost::mutex> lock(_mutex); - if (getReplicationMode() != modeReplSet) { - return Status::OK(); - } - - int memberID = handshake.getMemberId(); - Member* member = theReplSet->getMutableMember(memberID); - // it is possible that a node that was removed in a reconfig tried to handshake this node - // in that case, the Member will no longer be in theReplSet's _members List and member - // will be NULL - if (!member) { - return Status(ErrorCodes::NodeNotFound, - str::stream() << "Node with replica set member ID " << memberID << - " could not be found in replica set config while attempting to " - "associate it with RID " << handshake.getRid() << - " in replication handshake. ReplSet config: " << - theReplSet->getConfig().toString()); - } - - _ridMemberMap[handshake.getRid()] = member; - theReplSet->syncSourceFeedback.forwardSlaveHandshake(); - return Status::OK(); - } - - bool LegacyReplicationCoordinator::buildsIndexes() { - return theReplSet->buildIndexes(); - } - - vector<HostAndPort> LegacyReplicationCoordinator::getHostsWrittenTo(const OpTime& op) { - vector<BSONObj> configs = repl::getHostsWrittenTo(op); - vector<HostAndPort> hosts; - for (size_t i = 0; i < configs.size(); ++i) { - hosts.push_back(HostAndPort(configs[i]["host"].String())); - } - return hosts; - } - - vector<HostAndPort> LegacyReplicationCoordinator::getOtherNodesInReplSet() const { - std::vector<HostAndPort> rsMembers; - const unsigned rsSelfId = theReplSet->selfId(); - const std::vector<repl::ReplSetConfig::MemberCfg>& rsMemberConfigs = - repl::theReplSet->config().members; - for (size_t i = 0; i < rsMemberConfigs.size(); ++i) { - const unsigned otherId = rsMemberConfigs[i]._id; - if (rsSelfId == otherId) - continue; - const repl::Member* other = repl::theReplSet->findById(otherId); - if (!other) { - continue; - } - rsMembers.push_back(other->h()); - } - return rsMembers; - } - - Status LegacyReplicationCoordinator::checkIfWriteConcernCanBeSatisfied( - const WriteConcernOptions& writeConcern) const { - // TODO: rewrite this method with the correct version. Note that this just a - // temporary stub for secondary throttle. - - if (getReplicationMode() == ReplicationCoordinator::modeReplSet) { - if (writeConcern.wNumNodes > 1 && theReplSet->config().getMajority() <= 1) { - return Status(ErrorCodes::CannotSatisfyWriteConcern, "not enough nodes"); - } - } - - return Status::OK(); - } - - BSONObj LegacyReplicationCoordinator::getGetLastErrorDefault() { - if (getReplicationMode() == modeReplSet) { - return theReplSet->getLastErrorDefault; - } - return BSONObj(); - } - - bool LegacyReplicationCoordinator::isReplEnabled() const { - return _settings.usingReplSets() || _settings.slave || _settings.master; - } - - HostAndPort LegacyReplicationCoordinator::chooseNewSyncSource() { - const Member* member = theReplSet->getMemberToSyncTo(); - if (member) { - return member->h(); - } - else { - return HostAndPort(); - } - } - - void LegacyReplicationCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) { - theReplSet->veto(host.toString(), until); - } - - void LegacyReplicationCoordinator::resetLastOpTimeFromOplog(OperationContext* txn) { - theReplSet->loadLastOpTimeWritten(txn, false); - } - - bool LegacyReplicationCoordinator::shouldChangeSyncSource(const HostAndPort& currentSource) { - return theReplSet->shouldChangeSyncTarget(currentSource); - } - - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h deleted file mode 100644 index a5009827353..00000000000 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/status.h" -#include "mongo/db/repl/repl_coordinator.h" - -namespace mongo { -namespace repl { - - class Member; - - /** - * An implementation of ReplicationCoordinator that simply delegates to existing code. - */ - class LegacyReplicationCoordinator : public ReplicationCoordinator { - MONGO_DISALLOW_COPYING(LegacyReplicationCoordinator); - - public: - - LegacyReplicationCoordinator(const ReplSettings& settings); - virtual ~LegacyReplicationCoordinator(); - - virtual void startReplication(OperationContext*); - - virtual void shutdown(); - - virtual ReplSettings& getSettings(); - - virtual Mode getReplicationMode() const; - - virtual MemberState getCurrentMemberState() const; - - virtual Seconds getSlaveDelaySecs() const; - - virtual void clearSyncSourceBlacklist(); - - virtual ReplicationCoordinator::StatusAndDuration awaitReplication( - const OperationContext* txn, - const OpTime& ts, - const WriteConcernOptions& writeConcern); - - virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpForClient( - const OperationContext* txn, - const WriteConcernOptions& writeConcern); - - virtual ReplicationCoordinator::StatusAndDuration awaitReplicationOfLastOpApplied( - const OperationContext* txn, - const WriteConcernOptions& writeConcern); - - virtual Status stepDown(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime); - - virtual bool isMasterForReportingPurposes(); - - virtual bool canAcceptWritesForDatabase(const StringData& dbName); - - virtual Status checkIfWriteConcernCanBeSatisfied( - const WriteConcernOptions& writeConcern) const; - - virtual Status checkCanServeReadsFor(OperationContext* txn, - const NamespaceString& ns, - bool slaveOk); - - virtual bool shouldIgnoreUniqueIndex(const IndexDescriptor* idx); - - virtual Status setLastOptimeForSlave(OperationContext* txn, - const OID& rid, - const OpTime& ts); - - virtual Status setMyLastOptime(OperationContext* txn, const OpTime& ts); - - virtual void setMyHeartbeatMessage(const std::string& msg); - - virtual OpTime getMyLastOptime() const; - - virtual OID getElectionId(); - - virtual OID getMyRID() const; - - virtual int getMyId() const; - - virtual bool setFollowerMode(const MemberState& newState); - - virtual bool isWaitingForApplierToDrain(); - - virtual void signalDrainComplete(); - - virtual void signalUpstreamUpdater(); - - virtual void prepareReplSetUpdatePositionCommand(OperationContext* txn, - BSONObjBuilder* cmdBuilder); - - virtual void prepareReplSetUpdatePositionCommandHandshakes( - OperationContext* txn, - std::vector<BSONObj>* handshakes); - - virtual Status processReplSetGetStatus(BSONObjBuilder* result); - - virtual void fillIsMasterForReplSet(IsMasterResponse* result); - - virtual void appendSlaveInfoData(BSONObjBuilder* result); - - virtual void processReplSetGetConfig(BSONObjBuilder* result); - - virtual Status setMaintenanceMode(OperationContext* txn, bool activate); - - virtual bool getMaintenanceMode(); - - virtual Status processReplSetSyncFrom(const HostAndPort& target, - BSONObjBuilder* resultObj); - - virtual Status processReplSetFreeze(int secs, BSONObjBuilder* resultObj); - - virtual Status processHeartbeat(const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response); - - virtual Status processReplSetReconfig(OperationContext* txn, - const ReplSetReconfigArgs& args, - BSONObjBuilder* resultObj); - - virtual Status processReplSetInitiate(OperationContext* txn, - const BSONObj& configObj, - BSONObjBuilder* resultObj); - - virtual Status processReplSetGetRBID(BSONObjBuilder* resultObj); - - virtual void incrementRollbackID(); - - virtual Status processReplSetFresh(const ReplSetFreshArgs& args, - BSONObjBuilder* resultObj); - - virtual Status processReplSetElect(const ReplSetElectArgs& args, - BSONObjBuilder* resultObj); - - virtual Status processReplSetUpdatePosition(OperationContext* txn, - const UpdatePositionArgs& updates); - - virtual Status processHandshake(const OperationContext* txn, - const HandshakeArgs& handshake); - - virtual bool buildsIndexes(); - - virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op); - - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const; - - virtual BSONObj getGetLastErrorDefault(); - - virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); - - virtual bool isReplEnabled() const; - - virtual HostAndPort chooseNewSyncSource(); - - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); - - virtual void resetLastOpTimeFromOplog(OperationContext* txn); - - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); - - private: - - bool _setMaintenanceMode_inlock(OperationContext* txn, bool activate); - - Status _setLastOptime(OperationContext* txn, const OID& rid, const OpTime& ts); - - // Mutex that protects the _slaveOpTimeMap, and _maintenceMode - mutable boost::mutex _mutex; - - // Map from RID to Member pointer for replica set nodes - typedef std::map<OID, Member*> OIDMemberMap; - OIDMemberMap _ridMemberMap; - - // Maps nodes in this replication group to the last oplog operation they have committed - // TODO(spencer): change to unordered_map - typedef std::map<OID, OpTime> SlaveOpTimeMap; - SlaveOpTimeMap _slaveOpTimeMap; - - // Count of active callers into maintenance mode - int _maintenanceMode; - - // Rollback id. used to check if a rollback happened during some interval of time - // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. - int _rbid; - - // Our RID, used to identify us to our sync source when sending replication progress - // updates upstream. Set once at startup and then never modified again. - OID _myRID; - - ReplSettings _settings; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/repl_info.cpp b/src/mongo/db/repl/repl_info.cpp index 7ac993ad457..42d45c95fb5 100644 --- a/src/mongo/db/repl/repl_info.cpp +++ b/src/mongo/db/repl/repl_info.cpp @@ -43,7 +43,6 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/storage_options.h" #include "mongo/db/wire_version.h" #include "mongo/s/write_ops/batched_command_request.h" diff --git a/src/mongo/db/repl/repl_set_health_poll_task.cpp b/src/mongo/db/repl/repl_set_health_poll_task.cpp deleted file mode 100644 index 781cb735352..00000000000 --- a/src/mongo/db/repl/repl_set_health_poll_task.cpp +++ /dev/null @@ -1,304 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful,b -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/repl_set_health_poll_task.h" - -#include "mongo/bson/bsonelement.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/heartbeat.h" -#include "mongo/db/repl/manager.h" -#include "mongo/db/repl/member.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rs_config.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/util/log.h" - -namespace mongo { - -namespace repl { - - int ReplSetHealthPollTask::s_try_offset = 0; - - ReplSetHealthPollTask::ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) - : h(hh), m(mm), tries(s_try_offset), threshold(15), - _timeout(ReplSetConfig::DEFAULT_HB_TIMEOUT) { - - if (theReplSet) { - _timeout = theReplSet->config().getHeartbeatTimeout(); - } - - // doesn't need protection, all health tasks are created in a single thread - s_try_offset += 7; - } - - void ReplSetHealthPollTask::doWork() { - if ( !theReplSet ) { - LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog; - return; - } - - HeartbeatInfo mem = m; - HeartbeatInfo old = mem; - try { - BSONObj info; - int theirConfigVersion = -10000; - - bool ok = _requestHeartbeat(mem, info, theirConfigVersion); - - // weight new ping with old pings - // on the first ping, just use the ping value - if (old.ping != 0) { - mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2)); - } - - if( ok ) { - up(info, mem); - } - else if (info["code"].numberInt() == ErrorCodes::Unauthorized || - info["errmsg"].str() == "unauthorized") { - - authIssue(mem); - } - else { - down(mem, info.getStringField("errmsg")); - } - } - catch (const DBException& e) { - log() << "replSet health poll task caught a DBException: " << e.what(); - down(mem, e.what()); - } - catch (const std::exception& e) { - log() << "replSet health poll task caught an exception: " << e.what(); - down(mem, e.what()); - } - m = mem; - - theReplSet->mgr->send( stdx::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) ); - - static time_t last = 0; - time_t now = time(0); - bool changed = mem.changed(old); - if( changed ) { - if( old.hbstate != mem.hbstate ) - log() << "replSet member " << h.toString() << " is now in state " - << mem.hbstate.toString() << rsLog; - } - if( changed || now-last>4 ) { - last = now; - theReplSet->mgr->send( stdx::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); - } - } - - bool ReplSetHealthPollTask::tryHeartbeat(BSONObj* info, int* theirConfigVersion) { - bool ok = false; - - try { - ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), - h.toString(), *info, theReplSet->config().version, - *theirConfigVersion); - } - catch (DBException&) { - // don't do anything, ok is already false - } - - return ok; - } - - bool ReplSetHealthPollTask::_requestHeartbeat(HeartbeatInfo& mem, - BSONObj& info, - int& theirConfigVersion) { - { - ScopedConn conn(h.toString()); - conn.setTimeout(_timeout); - if (tries++ % threshold == (threshold - 1)) { - conn.reconnect(); - } - } - - Timer timer; - time_t before = curTimeMicros64() / 1000000; - - bool ok = tryHeartbeat(&info, &theirConfigVersion); - - mem.ping = static_cast<unsigned int>(timer.millis()); - time_t totalSecs = mem.ping / 1000; - - // if that didn't work and we have more time, lower timeout and try again - if (!ok && totalSecs < _timeout) { - log() << "replset info " << h.toString() << " heartbeat failed, retrying" << rsLog; - - // lower timeout to remaining ping time - { - ScopedConn conn(h.toString()); - conn.setTimeout(_timeout - totalSecs); - } - - int checkpoint = timer.millis(); - timer.reset(); - ok = tryHeartbeat(&info, &theirConfigVersion); - mem.ping = static_cast<unsigned int>(timer.millis()); - totalSecs = (checkpoint + mem.ping)/1000; - - // set timeout back to default - { - ScopedConn conn(h.toString()); - conn.setTimeout(_timeout); - } - } - - // we set this on any response - we don't get this far if - // couldn't connect because exception is thrown - time_t after = mem.lastHeartbeat = before + totalSecs; - - if ( info["time"].isNumber() ) { - long long t = info["time"].numberLong(); - if( t > after ) - mem.skew = (int) (t - after); - else if( t < before ) - mem.skew = (int) (t - before); // negative - } - else { - // it won't be there if remote hasn't initialized yet - if( info.hasElement("time") ) - warning() << "heatbeat.time isn't a number: " << info << endl; - mem.skew = INT_MIN; - } - - { - BSONElement state = info["state"]; - if( state.ok() ) - mem.hbstate = MemberState(state.Int()); - } - - if (info.hasField("stateDisagreement") && info["stateDisagreement"].trueValue()) { - log() << "replset info " << h.toString() << " thinks that we are down" << endl; - } - - return ok; - } - - void ReplSetHealthPollTask::authIssue(HeartbeatInfo& mem) { - mem.authIssue = true; - mem.hbstate = MemberState::RS_UNKNOWN; - - // set health to 0 so that this doesn't count towards majority - mem.health = 0.0; - theReplSet->rmFromElectable(mem.id()); - } - - void ReplSetHealthPollTask::down(HeartbeatInfo& mem, string msg) { - // if we've received a heartbeat from this member within the last two seconds, don't - // change its state to down (if it's already down, leave it down since we don't have - // any info about it other than it's heartbeating us) - const Member* oldMemInfo = theReplSet->findById(mem.id()); - if (oldMemInfo && oldMemInfo->hbinfo().lastHeartbeatRecv+2 >= time(0)) { - log() << "replset info " << h.toString() - << " just heartbeated us, but our heartbeat failed: " << msg - << ", not changing state" << rsLog; - // we don't update any of the heartbeat info, though, since we didn't get any info - // other than "not down" from having it heartbeat us - return; - } - - mem.authIssue = false; - mem.health = 0.0; - mem.ping = 0; - if( mem.upSince || mem.downSince == 0 ) { - mem.upSince = 0; - mem.downSince = jsTime(); - mem.hbstate = MemberState::RS_DOWN; - log() << "replSet info " << h.toString() << " is down (or slow to respond): " - << msg << rsLog; - } - mem.lastHeartbeatMsg = msg; - theReplSet->rmFromElectable(mem.id()); - } - - void ReplSetHealthPollTask::up(const BSONObj& info, HeartbeatInfo& mem) { - HeartbeatInfo::numPings++; - mem.authIssue = false; - - if( mem.upSince == 0 ) { - log() << "replSet member " << h.toString() << " is up" << rsLog; - mem.upSince = mem.lastHeartbeat; - } - mem.health = 1.0; - mem.lastHeartbeatMsg = info["hbmsg"].String(); - if (info.hasElement("syncingTo")) { - mem.syncingTo = info["syncingTo"].String(); - } - else { - // empty out syncingTo since they are no longer syncing to anyone - mem.syncingTo = ""; - } - - if( info.hasElement("opTime") ) - mem.opTime = info["opTime"].Date(); - - // see if this member is in the electable set - if( info["e"].eoo() ) { - // for backwards compatibility - const Member *member = theReplSet->findById(mem.id()); - if (member && member->config().potentiallyHot()) { - theReplSet->addToElectable(mem.id()); - } - else { - theReplSet->rmFromElectable(mem.id()); - } - } - // add this server to the electable set if it is within 10 - // seconds of the latest optime we know of - else if( info["e"].trueValue() && - mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { - unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); - if (lastOp > 0 && mem.opTime >= lastOp - 10) { - theReplSet->addToElectable(mem.id()); - } - } - else { - theReplSet->rmFromElectable(mem.id()); - } - - BSONElement cfg = info["config"]; - if( cfg.ok() ) { - // received a new config - stdx::function<void()> f = - stdx::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); - theReplSet->mgr->send(f); - } - if (info.hasElement("electionTime")) { - LOG(4) << "setting electionTime to " << info["electionTime"]; - mem.electionTime = info["electionTime"].Date(); - } - } -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/repl_set_health_poll_task.h b/src/mongo/db/repl/repl_set_health_poll_task.h deleted file mode 100644 index 5e53e32d56b..00000000000 --- a/src/mongo/db/repl/repl_set_health_poll_task.h +++ /dev/null @@ -1,95 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful,b -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include <string> - -#include "mongo/db/repl/heartbeat_info.h" -#include "mongo/bson/bsonobj.h" -#include "mongo/util/concurrency/task.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { -namespace repl { - - /** - * Poll every other set member to check its status. - * - * A detail about local machines and authentication: suppose we have 2 - * members, A and B, on the same machine using different keyFiles. A is - * primary. If we're just starting the set, there are no admin users, so A - * and B can access each other because it's local access. - * - * Then we add a user to A. B cannot sync this user from A, because as soon - * as we add a an admin user, A requires auth. However, A can still - * heartbeat B, because B *doesn't* have an admin user. So A can reach B - * but B cannot reach A. - * - * Once B is restarted with the correct keyFile, everything should work as - * expected. - */ - class ReplSetHealthPollTask : public task::Task { - private: - /** - * Each healthpoll task reconnects periodically. By starting each task at a different - * number for "tries", the tasks will reconnect at different times, minimizing the impact - * of network blips. - */ - static int s_try_offset; - - HostAndPort h; - HeartbeatInfo m; - int tries; - const int threshold; - public: - ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm); - - string name() const { return "rsHealthPoll"; } - - void setUp() { } - - void doWork(); - - private: - bool tryHeartbeat(BSONObj* info, int* theirConfigVersion); - - bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion); - - void authIssue(HeartbeatInfo& mem); - - void down(HeartbeatInfo& mem, string msg); - - void up(const BSONObj& info, HeartbeatInfo& mem); - - // Heartbeat timeout - time_t _timeout; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp deleted file mode 100644 index 24039b2d17c..00000000000 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ /dev/null @@ -1,1028 +0,0 @@ -/** -* Copyright (C) 2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/repl_set_impl.h" - -#include "mongo/db/client.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/global_environment_experiment.h" -#include "mongo/db/index_rebuilder.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/isself.h" -#include "mongo/db/repl/minvalid.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplogreader.h" -#include "mongo/db/repl/repl_set_seed_list.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/db/storage/storage_engine.h" -#include "mongo/s/d_state.h" -#include "mongo/util/background.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { - -namespace repl { - - void ReplSetImpl::sethbmsg(const std::string& s, int logLevel) { - static time_t lastLogged; - _hbmsgTime = time(0); - - if (s == _hbmsg) { - // unchanged - if (_hbmsgTime - lastLogged < 60) - return; - } - - unsigned sz = s.size(); - if (sz >= 256) - memcpy(_hbmsg, s.c_str(), 255); - else { - _hbmsg[sz] = 0; - memcpy(_hbmsg, s.c_str(), sz); - } - if (!s.empty()) { - lastLogged = _hbmsgTime; - LOG(logLevel) << "replSet " << s << rsLog; - } - } - - void ReplSetImpl::goStale(OperationContext* txn, const Member* stale, const BSONObj& oldest) { - log() << "replSet error RS102 too stale to catch up, at least from " - << stale->fullName() << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet oldest at " << stale->fullName() << " : " - << oldest["ts"]._opTime().toStringLong() << rsLog; - log() << "replSet See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember" - << rsLog; - - // reset minvalid so that we can't become primary prematurely - setMinValid(txn, oldest["ts"]._opTime()); - - sethbmsg("error RS102 too stale to catch up"); - changeState(MemberState::RS_RECOVERING); - } - -namespace { - static void dropAllTempCollections(OperationContext* txn) { - vector<string> dbNames; - StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine(); - storageEngine->listDatabases( &dbNames ); - - for (vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) { - // The local db is special because it isn't replicated. It is cleared at startup even on - // replica set members. - if (*it == "local") - continue; - - Client::Context ctx(txn, *it); - ctx.db()->clearTmpCollections(txn); - } - } -} - - void ReplSetImpl::_assumePrimary() { - LOG(1) << "replSet assuming primary" << endl; - verify(iAmPotentiallyHot()); - - // Wait for replication to stop and buffer to be consumed - LOG(1) << "replSet waiting for replication to finish before becoming primary" << endl; - BackgroundSync::get()->stopReplicationAndFlushBuffer(); - - // Lock here to prevent stepping down & becoming primary from getting interleaved - LOG(1) << "replSet waiting for global write lock"; - - OperationContextImpl txn; // XXX? - Lock::GlobalWrite lk(txn.lockState()); - - initOpTimeFromOplog(&txn, "local.oplog.rs"); - - // Generate new election unique id - elect.setElectionId(OID::gen()); - LOG(1) << "replSet truly becoming primary"; - changeState(MemberState::RS_PRIMARY); - - // This must be done after becoming primary but before releasing the write lock. This adds - // the dropCollection entries for every temp collection to the opLog since we want it to be - // replicated to secondaries. - dropAllTempCollections(&txn); - } - - void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } - - Member* ReplSetImpl::getMostElectable() { - lock lk(this); - - Member *max = 0; - set<unsigned>::iterator it = _electableSet.begin(); - while (it != _electableSet.end()) { - const Member *temp = findById(*it); - if (!temp) { - log() << "couldn't find member: " << *it << endl; - set<unsigned>::iterator it_delete = it; - it++; - _electableSet.erase(it_delete); - continue; - } - if (!max || max->config().priority < temp->config().priority) { - max = (Member*)temp; - } - it++; - } - - return max; - } - - void ReplSetImpl::relinquish(OperationContext* txn) { - { - Lock::GlobalWrite writeLock(txn->lockState()); // so we are synchronized with _logOp() - - LOG(2) << "replSet attempting to relinquish" << endl; - if (box.getState().primary()) { - log() << "replSet relinquishing primary state" << rsLog; - changeState(MemberState::RS_SECONDARY); - - // close sockets that were talking to us so they don't blithly send many writes that - // will fail with "not master" (of course client could check result code, but in - // case they are not) - log() << "replSet closing client sockets after relinquishing primary" << rsLog; - MessagingPort::closeAllSockets(ScopedConn::keepOpen); - } - else if (box.getState().startup2()) { - // This block probably isn't necessary - changeState(MemberState::RS_RECOVERING); - return; - } - } - - // now that all connections were closed, strip this mongod from all sharding details if and - // when it gets promoted to a primary again, only then it should reload the sharding state - // the rationale here is that this mongod won't bring stale state when it regains - // primaryhood - shardingState.resetShardingState(); - } - - // look freshly for who is primary - includes relinquishing ourself. - void ReplSetImpl::forgetPrimary(OperationContext* txn) { - if (box.getState().primary()) - relinquish(txn); - else { - box.setOtherPrimary(0); - } - } - - // for the replSetStepDown command - bool ReplSetImpl::_stepDown(OperationContext* txn, int secs) { - lock lk(this); - if (box.getState().primary()) { - elect.steppedDown = time(0) + secs; - log() << "replSet info stepping down as primary secs=" << secs << rsLog; - relinquish(txn); - return true; - } - return false; - } - - bool ReplSetImpl::_freeze(int secs) { - lock lk(this); - // note if we are primary we remain primary but won't try to elect ourself again until - // this time period expires. - if (secs == 0) { - elect.steppedDown = 0; - log() << "replSet info 'unfreezing'" << rsLog; - } - else { - if (!box.getState().primary()) { - elect.steppedDown = time(0) + secs; - log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog; - } - else { - log() << "replSet info received freeze command but we are primary" << rsLog; - } - } - return true; - } - - void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { - for (Member *m = _members.head(); m; m=m->next()) { - if (static_cast<int>(m->id()) == h.id()) { - m->_hbinfo.updateFromLastPoll(h); - return; - } - } - } - - void ReplSetImpl::msgUpdateHBRecv(unsigned id, time_t newTime) { - for (Member *m = _members.head(); m; m = m->next()) { - if (m->id() == id) { - m->_hbinfo.lastHeartbeatRecv = newTime; - return; - } - } - } - - list<HostAndPort> ReplSetImpl::memberHostnames() const { - list<HostAndPort> L; - L.push_back(_self->h()); - for (Member *m = _members.head(); m; m = m->next()) - L.push_back(m->h()); - return L; - } - - void ReplSetImpl::_fillIsMasterHost(const Member *m, - vector<string>& hosts, - vector<string>& passives, - vector<string>& arbiters) { - verify(m); - if (m->config().hidden) - return; - - if (m->potentiallyHot()) { - hosts.push_back(m->h().toString()); - } - else if (!m->config().arbiterOnly) { - if (m->config().slaveDelay) { - // hmmm - we don't list these as they are stale. - } - else { - passives.push_back(m->h().toString()); - } - } - else { - arbiters.push_back(m->h().toString()); - } - } - - void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) { - lock lk(this); - - const StateBox::SP sp = box.get(); - bool isp = sp.state.primary(); - b.append("setName", name()); - b.append("setVersion", version()); - b.append("ismaster", isp); - b.append("secondary", sp.state.secondary()); - { - vector<string> hosts, passives, arbiters; - _fillIsMasterHost(_self, hosts, passives, arbiters); - - for (Member *m = _members.head(); m; m = m->next()) { - verify(m); - _fillIsMasterHost(m, hosts, passives, arbiters); - } - - if (hosts.size() > 0) { - b.append("hosts", hosts); - } - if (passives.size() > 0) { - b.append("passives", passives); - } - if (arbiters.size() > 0) { - b.append("arbiters", arbiters); - } - } - - if (!isp) { - const Member *m = sp.primary; - if (m) - b.append("primary", m->h().toString()); - } - else { - b.append("primary", _self->fullName()); - } - - if (myConfig().arbiterOnly) - b.append("arbiterOnly", true); - if (myConfig().priority == 0 && !myConfig().arbiterOnly) - b.append("passive", true); - if (myConfig().slaveDelay) - b.append("slaveDelay", myConfig().slaveDelay); - if (myConfig().hidden) - b.append("hidden", true); - if (!myConfig().buildIndexes) - b.append("buildIndexes", false); - if (!myConfig().tags.empty()) { - BSONObjBuilder a; - for (map<string,string>::const_iterator i = myConfig().tags.begin(); - i != myConfig().tags.end(); - i++) { - a.append((*i).first, (*i).second); - } - b.append("tags", a.done()); - } - b.append("me", myConfig().h.toString()); - } - - void ReplSetImpl::init(OperationContext* txn, ReplSetSeedList& replSetSeedList) { - mgr = new Manager(this); - - _cfg = 0; - memset(_hbmsg, 0, sizeof(_hbmsg)); - strcpy(_hbmsg , "initial startup"); - changeState(MemberState::RS_STARTUP); - - _seeds = &replSetSeedList.seeds; - - LOG(1) << "replSet beginning startup..." << rsLog; - - loadConfig(txn); - - unsigned sss = replSetSeedList.seedSet.size(); - for (Member *m = head(); m; m = m->next()) { - replSetSeedList.seedSet.erase(m->h()); - } - for (set<HostAndPort>::iterator i = replSetSeedList.seedSet.begin(); - i != replSetSeedList.seedSet.end(); - i++) { - if (isSelf(*i)) { - if (sss == 1) { - LOG(1) << "replSet warning self is listed in the seed list and there are no " - "other seeds listed did you intend that?" << rsLog; - } - } - else { - log() << "replSet warning command line seed " << i->toString() - << " is not present in the current repl set config" << rsLog; - } - } - } - - ReplSetImpl::ReplSetImpl() : - elect(this), - _forceSyncTarget(0), - _hbmsgTime(0), - _self(0), - _maintenanceMode(0), - mgr(0) { - } - - void ReplSetImpl::loadLastOpTimeWritten(OperationContext* txn, bool quiet) { - Lock::DBRead lk(txn->lockState(), rsoplog); - BSONObj o; - if (Helpers::getLast(txn, rsoplog, o)) { - OpTime lastOpTime = o["ts"]._opTime(); - uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTime.isNull()); - getGlobalReplicationCoordinator()->setMyLastOptime(txn, lastOpTime); - } - else { - getGlobalReplicationCoordinator()->setMyLastOptime(txn, OpTime()); - } - } - - // call after constructing to start - returns fairly quickly after launching its threads - void ReplSetImpl::_go() { - OperationContextImpl txn; - - try { - // Note: this sets lastOpTimeWritten, which the Applier uses to determine whether to - // do an initial sync or not. - loadLastOpTimeWritten(&txn); - } - catch (std::exception& e) { - log() << "replSet error fatal couldn't query the local " << rsoplog - << " collection. Terminating mongod after 30 seconds." << rsLog; - log() << e.what() << rsLog; - sleepsecs(30); - dbexit(EXIT_REPLICATION_ERROR); - return; - } - - // initialize _me in SyncSourceFeedback - bool meEnsured = false; - while (!inShutdown() && !meEnsured) { - try { - syncSourceFeedback.ensureMe(&txn); - meEnsured = true; - } - catch (const DBException& e) { - warning() << "failed to write to local.me: " << e.what() - << " trying again in one second"; - sleepsecs(1); - } - } - - - bool worked = getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_STARTUP2); - invariant(worked); - startThreads(); - newReplUp(); // oplog.cpp - } - - void ReplSetImpl::setSelfTo(Member *m) { - // already locked in initFromConfig - _self = m; - _id = m->id(); - _config = m->config(); - if (m) _buildIndexes = m->config().buildIndexes; - else _buildIndexes = true; - } - - // @param reconf true if this is a reconfiguration and not an initial load of the configuration. - // @return true if ok; throws if config really bad; false if config doesn't include self - bool ReplSetImpl::initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf) { - // NOTE: haveNewConfig() writes the new config to disk before we get here. So - // we cannot error out at this point, except fatally. Check errors earlier. - lock lk(this); - - if (!getLastErrorDefault.isEmpty() || !c.getLastErrorDefaults.isEmpty()) { - getLastErrorDefault = c.getLastErrorDefaults; - } - - list<ReplSetConfig::MemberCfg*> newOnes; - // additive short-cuts the new config setup. If we are just adding a - // node/nodes and nothing else is changing, this is additive. If it's - // not a reconfig, we're not adding anything - bool additive = reconf; - bool updateConfigs = false; - { - unsigned nfound = 0; - int me = 0; - for (vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); - i != c.members.end(); - i++) { - - ReplSetConfig::MemberCfg& m = *i; - if (isSelf(m.h)) { - me++; - } - - if (reconf) { - const Member *old = findById(m._id); - if (old) { - nfound++; - verify((int) old->id() == m._id); - if (!old->config().isSameIgnoringTags(m)) { - additive = false; - } - if (!updateConfigs && old->config() != m) { - updateConfigs = true; - } - } - else { - newOnes.push_back(&m); - } - } - } - if (me == 0) { // we're not in the config -- we must have been removed - if (state().removed()) { - // already took note of our ejection from the set - // so just sit tight and poll again - return false; - } - - _members.orphanAll(); - - // kill off rsHealthPoll threads (because they Know Too Much about our past) - endOldHealthTasks(); - - // clear sync target to avoid faulty sync attempts; we must do this before we - // close sockets, since that will trigger the bgsync thread to reconnect. - BackgroundSync::get()->clearSyncTarget(); - - // close sockets to force clients to re-evaluate this member - MessagingPort::closeAllSockets(0); - - // take note of our ejection - changeState(MemberState::RS_REMOVED); - - // go into holding pattern - log() << "replSet info self not present in the repl set configuration:" << rsLog; - log() << c.toString() << rsLog; - - loadConfig(txn); // redo config from scratch - return false; - } - uassert(13302, "replSet error self appears twice in the repl set configuration", me<=1); - - if (state().removed()) { - // If we were removed and have now been added back in, switch state. - changeState(MemberState::RS_RECOVERING); - } - - // if we found different members that the original config, reload everything - if (reconf && config().members.size() != nfound) - additive = false; - } - - // If we are changing chaining rules, we don't want this to be an additive reconfig so that - // the primary can step down and the sync targets change. - // TODO: This can be removed once SERVER-5208 is fixed. - if (reconf && config().chainingAllowed() != c.chainingAllowed()) { - additive = false; - } - - _cfg = new ReplSetConfig(c); - - // config() is same thing but const, so we use that when we can for clarity below - dassert(&config() == _cfg); - verify(config().ok()); - verify(_name.empty() || _name == config()._id); - _name = config()._id; - verify(!_name.empty()); - - // this is a shortcut for simple changes - if (additive) { - log() << "replSet info : additive change to configuration" << rsLog; - if (updateConfigs) { - // we have new configs for existing members, so we need to repopulate _members - // with the most recent configs - _members.orphanAll(); - - // for logging - string members = ""; - - // not setting _self to 0 as other threads use _self w/o locking - int me = 0; - for(vector<ReplSetConfig::MemberCfg>::const_iterator i = config().members.begin(); - i != config().members.end(); i++) { - const ReplSetConfig::MemberCfg& m = *i; - Member *mi; - members += (members == "" ? "" : ", ") + m.h.toString(); - if (isSelf(m.h)) { - verify(me++ == 0); - mi = new Member(m.h, m._id, &m, true); - setSelfTo(mi); - } - else { - mi = new Member(m.h, m._id, &m, false); - _members.push(mi); - } - } - // trigger a handshake to update the syncSource of our writeconcern information - syncSourceFeedback.forwardSlaveHandshake(); - } - - // add any new members - for (list<ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); - i != newOnes.end(); - i++) { - ReplSetConfig::MemberCfg *m = *i; - Member *mi = new Member(m->h, m->_id, m, false); - - // we will indicate that new members are up() initially so that we don't relinquish - // our primary state because we can't (transiently) see a majority. they should be - // up as we check that new members are up before getting here on reconfig anyway. - mi->get_hbinfo().health = 0.1; - - _members.push(mi); - startHealthTaskFor(mi); - } - - // if we aren't creating new members, we may have to update the - // groups for the current ones - _cfg->updateMembers(_members); - - return true; - } - - // start with no members. if this is a reconfig, drop the old ones. - _members.orphanAll(); - - endOldHealthTasks(); - - int oldPrimaryId = -1; - { - const Member *p = box.getPrimary(); - if (p) - oldPrimaryId = p->id(); - } - forgetPrimary(txn); - - // not setting _self to 0 as other threads use _self w/o locking - int me = 0; - - // For logging - string members = ""; - - for (vector<ReplSetConfig::MemberCfg>::const_iterator i = config().members.begin(); - i != config().members.end(); - i++) { - const ReplSetConfig::MemberCfg& m = *i; - Member *mi; - members += (members == "" ? "" : ", ") + m.h.toString(); - if (isSelf(m.h)) { - verify(me++ == 0); - mi = new Member(m.h, m._id, &m, true); - if (!reconf) { - log() << "replSet I am " << m.h.toString() << rsLog; - } - setSelfTo(mi); - - if ((int)mi->id() == oldPrimaryId) - box.setSelfPrimary(mi); - } - else { - mi = new Member(m.h, m._id, &m, false); - _members.push(mi); - if ((int)mi->id() == oldPrimaryId) - box.setOtherPrimary(mi); - } - } - - if (me == 0){ - log() << "replSet warning did not detect own host in full reconfig, members " - << members << " config: " << c << rsLog; - } - else { - // Do this after we've found ourselves, since _self needs - // to be set before we can start the heartbeat tasks - for (Member *mb = _members.head(); mb; mb=mb->next()) { - startHealthTaskFor(mb); - } - } - return true; - } - - // Our own config must be the first one. - bool ReplSetImpl::_loadConfigFinish(OperationContext* txn, vector<ReplSetConfig*>& cfgs) { - int v = -1; - ReplSetConfig *highest = 0; - int myVersion = -2000; - int n = 0; - for (vector<ReplSetConfig*>::iterator i = cfgs.begin(); i != cfgs.end(); i++) { - ReplSetConfig* cfg = *i; - DEV { LOG(1) << n+1 << " config shows version " << cfg->version << rsLog; } - if (++n == 1) myVersion = cfg->version; - if (cfg->ok() && cfg->version > v) { - highest = cfg; - v = cfg->version; - } - } - verify(highest); - - if (!initFromConfig(txn, *highest)) - return false; - - if (highest->version > myVersion && highest->version >= 0) { - log() << "replSet got config version " << highest->version - << " from a remote, saving locally" << rsLog; - highest->saveConfigLocally(txn, BSONObj()); - } - return true; - } - - void ReplSetImpl::loadConfig(OperationContext* txn) { - startupStatus = LOADINGCONFIG; - startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)"); - LOG(1) << "loadConfig() " << rsConfigNs << endl; - - while (1) { - try { - OwnedPointerVector<ReplSetConfig> configs; - try { - configs.mutableVector().push_back(ReplSetConfig::makeDirect(txn)); - } - catch (DBException& e) { - log() << "replSet exception loading our local replset configuration object : " - << e.toString() << rsLog; - } - for (vector<HostAndPort>::const_iterator i = _seeds->begin(); - i != _seeds->end(); - i++) { - try { - configs.mutableVector().push_back(ReplSetConfig::make(txn, *i)); - } - catch (DBException& e) { - log() << "replSet exception trying to load config from " << *i - << " : " << e.toString() << rsLog; - } - } - ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - { - scoped_lock lck(replSettings.discoveredSeeds_mx); - if (replSettings.discoveredSeeds.size() > 0) { - for (set<string>::iterator i = replSettings.discoveredSeeds.begin(); - i != replSettings.discoveredSeeds.end(); - i++) { - try { - configs.mutableVector().push_back( - ReplSetConfig::make(txn, HostAndPort(*i))); - } - catch (DBException&) { - LOG(1) << "replSet exception trying to load config from discovered " - "seed " << *i << rsLog; - replSettings.discoveredSeeds.erase(*i); - } - } - } - } - - if (!replSettings.reconfig.isEmpty()) { - try { - configs.mutableVector().push_back(ReplSetConfig::make(txn, replSettings.reconfig, - true)); - } - catch (DBException& re) { - log() << "replSet couldn't load reconfig: " << re.what() << rsLog; - replSettings.reconfig = BSONObj(); - } - } - - int nok = 0; - int nempty = 0; - for (vector<ReplSetConfig*>::iterator i = configs.mutableVector().begin(); - i != configs.mutableVector().end(); i++) { - if ((*i)->ok()) - nok++; - if ((*i)->empty()) - nempty++; - } - if (nok == 0) { - - if (nempty == (int) configs.mutableVector().size()) { - startupStatus = EMPTYCONFIG; - startupStatusMsg.set("can't get " + rsConfigNs + - " config from self or any seed (EMPTYCONFIG)"); - log() << "replSet can't get " << rsConfigNs - << " config from self or any seed (EMPTYCONFIG)" << rsLog; - static unsigned once; - if (++once == 1) { - log() << "replSet info you may need to run replSetInitiate -- rs.initia" - "te() in the shell -- if that is not already done" << rsLog; - } - if (_seeds->size() == 0) { - LOG(1) << "replSet info no seed hosts were specified on the --replSet " - "command line" << rsLog; - } - } - else { - startupStatus = EMPTYUNREACHABLE; - startupStatusMsg.set("can't currently get " + rsConfigNs + - " config from self or any seed (EMPTYUNREACHABLE)"); - log() << "replSet can't get " << rsConfigNs - << " config from self or any seed (yet)" << rsLog; - } - - sleepsecs(1); - continue; - } - - if (!_loadConfigFinish(txn, configs.mutableVector())) { - log() << "replSet info Couldn't load config yet. Sleeping 3 sec and will try " - "again." << rsLog; - sleepsecs(3); - continue; - } - } - catch (DBException& e) { - startupStatus = BADCONFIG; - startupStatusMsg.set("replSet error loading set config (BADCONFIG)"); - log() << "replSet error loading configurations " << e.toString() << rsLog; - log() << "replSet error replication will not start" << rsLog; - sethbmsg("error loading set config"); - fassertFailedNoTrace(18754); - throw; - } - break; - } - startupStatusMsg.set("? started"); - startupStatus = STARTED; - } - const Member* ReplSetImpl::getMemberToSyncTo() { - lock lk(this); - - // if we have a target we've requested to sync from, use it - - if (_forceSyncTarget) { - Member* target = _forceSyncTarget; - _forceSyncTarget = 0; - sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0); - return target; - } - - const Member* primary = box.getPrimary(); - - // wait for 2N pings before choosing a sync target - if (_cfg) { - int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; - - if (needMorePings > 0) { - OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; - return NULL; - } - - // If we are only allowed to sync from the primary, return that - if (!_cfg->chainingAllowed()) { - // Returns NULL if we cannot reach the primary - return primary; - } - } - - // find the member with the lowest ping time that has more data than me - - // Find primary's oplog time. Reject sync candidates that are more than - // maxSyncSourceLagSecs seconds behind. - OpTime primaryOpTime; - if (primary) - primaryOpTime = primary->hbinfo().opTime; - else - // choose a time that will exclude no candidates, since we don't see a primary - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); - - if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) { - // erh - I think this means there was just a new election - // and we don't yet know the new primary's optime - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); - } - - OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0); - - Member *closest = 0; - time_t now = 0; - - // Make two attempts. The first attempt, we ignore those nodes with - // slave delay higher than our own. The second attempt includes such - // nodes, in case those are the only ones we can reach. - // This loop attempts to set 'closest'. - for (int attempts = 0; attempts < 2; ++attempts) { - for (Member *m = _members.head(); m; m = m->next()) { - if (!m->syncable()) - continue; - - if (m->state() == MemberState::RS_SECONDARY) { - // only consider secondaries that are ahead of where we are - if (m->hbinfo().opTime <= lastOpTimeWritten) - continue; - // omit secondaries that are excessively behind, on the first attempt at least. - if (attempts == 0 && - m->hbinfo().opTime < oldestSyncOpTime) - continue; - } - - // omit nodes that are more latent than anything we've already considered - if (closest && - (m->hbinfo().ping > closest->hbinfo().ping)) - continue; - - if (attempts == 0 && - (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) { - continue; // skip this one in the first attempt - } - - map<string,time_t>::iterator vetoed = _veto.find(m->fullName()); - if (vetoed != _veto.end()) { - // Do some veto housekeeping - if (now == 0) { - now = time(0); - } - - // if this was on the veto list, check if it was vetoed in the last "while". - // if it was, skip. - if (vetoed->second >= now) { - if (time(0) % 5 == 0) { - log() << "replSet not trying to sync from " << (*vetoed).first - << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog; - } - continue; - } - _veto.erase(vetoed); - // fall through, this is a valid candidate now - } - // This candidate has passed all tests; set 'closest' - closest = m; - } - if (closest) break; // no need for second attempt - } - - if (!closest) { - return NULL; - } - - sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); - - return closest; - } - - void ReplSetImpl::veto(const string& host, const Date_t until) { - lock lk(this); - _veto[host] = until.toTimeT(); - } - - Status ReplSetImpl::forceSyncFrom(const string& host, BSONObjBuilder* result) { - lock lk(this); - - // initial sanity check - if (iAmArbiterOnly()) { - return Status(ErrorCodes::NotSecondary, "arbiters don't sync"); - } - if (box.getState().primary()) { - return Status(ErrorCodes::NotSecondary, "primaries don't sync"); - } - if (_self != NULL && host == _self->fullName()) { - return Status(ErrorCodes::InvalidOptions, "I cannot sync from myself"); - } - - // find the member we want to sync from - Member *newTarget = 0; - for (Member *m = _members.head(); m; m = m->next()) { - if (m->fullName() == host) { - newTarget = m; - break; - } - } - - // do some more sanity checks - if (!newTarget) { - // this will also catch if someone tries to sync a member from itself, as _self is not - // included in the _members list. - return Status(ErrorCodes::NodeNotFound, "could not find member in replica set"); - } - if (newTarget->config().arbiterOnly) { - return Status(ErrorCodes::InvalidOptions, "I cannot sync from an arbiter"); - } - if (!newTarget->config().buildIndexes && myConfig().buildIndexes) { - return Status(ErrorCodes::InvalidOptions, - "I cannot sync from a member who does not build indexes"); - } - if (newTarget->hbinfo().authIssue) { - return Status(ErrorCodes::Unauthorized, - "not authorized to communicate with " + newTarget->fullName()); - } - if (newTarget->hbinfo().health == 0) { - return Status(ErrorCodes::HostUnreachable, "I cannot reach the requested member"); - } - if (newTarget->hbinfo().opTime.getSecs()+10 < lastOpTimeWritten.getSecs()) { - log() << "attempting to sync from " << newTarget->fullName() - << ", but its latest opTime is " << newTarget->hbinfo().opTime.getSecs() - << " and ours is " << lastOpTimeWritten.getSecs() << " so this may not work" - << rsLog; - result->append("warning", "requested member is more than 10 seconds behind us"); - // not returning false, just warning - } - - // record the previous member we were syncing from - const HostAndPort prev = BackgroundSync::get()->getSyncTarget(); - if (!prev.empty()) { - result->append("prevSyncTarget", prev.toString()); - } - - // finally, set the new target - _forceSyncTarget = newTarget; - return Status::OK(); - } - - bool ReplSetImpl::gotForceSync() { - lock lk(this); - return _forceSyncTarget != 0; - } - - bool ReplSetImpl::shouldChangeSyncTarget(const HostAndPort& currentTarget) { - lock lk(this); - OpTime targetOpTime = findByName(currentTarget.toString())->hbinfo().opTime; - for (Member *m = _members.head(); m; m = m->next()) { - if (m->syncable() && - targetOpTime.getSecs()+maxSyncSourceLagSecs < m->hbinfo().opTime.getSecs()) { - log() << "changing sync target because current sync target's most recent OpTime is " - << targetOpTime.toStringPretty() << " which is more than " - << maxSyncSourceLagSecs << " seconds behind member " << m->fullName() - << " whose most recent OpTime is " << m->hbinfo().opTime.getSecs(); - return true; - } - } - if (gotForceSync()) { - return true; - } - return false; - } - - void ReplSetImpl::clearVetoes() { - lock lk(this); - _veto.clear(); - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h deleted file mode 100644 index ecf288ef24b..00000000000 --- a/src/mongo/db/repl/repl_set_impl.h +++ /dev/null @@ -1,292 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/repl/consensus.h" -#include "mongo/db/repl/heartbeat_info.h" -#include "mongo/db/repl/manager.h" -#include "mongo/db/repl/member.h" -#include "mongo/db/repl/rs_base.h" -#include "mongo/db/repl/rs_config.h" -#include "mongo/db/repl/state_box.h" -#include "mongo/db/repl/sync_source_feedback.h" -#include "mongo/db/repl/sync_tail.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/concurrency/value.h" - -namespace mongo { - - class Cloner; - class OperationContext; - -namespace repl { - - struct FixUpInfo; - class OplogReader; - class ReplSetSeedList; - class ReplSetHealthPollTask; - - // information about the entire replset, such as the various servers in the set, and their state - /* note: We currently do not free mem when the set goes away - it is assumed the replset is a - singleton and long lived. - */ - class ReplSetImpl : protected RSBase { - public: - // info on our state if the replset isn't yet "up". for example, if we are pre-initiation. - enum StartupStatus { - PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, - EMPTYUNREACHABLE=4, STARTED=5, SOON=6 - }; - static StartupStatus startupStatus; - static DiagStr startupStatusMsg; - static string stateAsHtml(MemberState state); - - /* todo thread */ - void msgUpdateHBInfo(HeartbeatInfo); - - /** - * Updates the lastHeartbeatRecv of Member with the given id. - */ - void msgUpdateHBRecv(unsigned id, time_t newTime); - - void electCmdReceived(const StringData& set, - unsigned whoid, - int cfgver, - const OID& round, - BSONObjBuilder* result) { - elect.electCmdReceived(set, whoid, cfgver, round, result); - } - - StateBox box; - - SyncSourceFeedback syncSourceFeedback; - - OpTime lastOpTimeWritten; - - Status forceSyncFrom(const string& host, BSONObjBuilder* result); - // Check if the current sync target is suboptimal. This must be called while holding a mutex - // that prevents the sync source from changing. - bool shouldChangeSyncTarget(const HostAndPort& target); - - /** - * Find the closest member (using ping time) with a higher latest optime. - */ - const Member* getMemberToSyncTo(); - void veto(const string& host, Date_t until); - bool gotForceSync(); - void goStale(OperationContext* txn, const Member* m, const BSONObj& o); - - OID getElectionId() const { return elect.getElectionId(); } - OpTime getElectionTime() const { return elect.getElectionTime(); } - - void loadLastOpTimeWritten(OperationContext* txn, bool quiet = false); - private: - set<ReplSetHealthPollTask*> healthTasks; - void endOldHealthTasks(); - void startHealthTaskFor(Member *m); - - Consensus elect; - void relinquish(OperationContext* txn); - void forgetPrimary(OperationContext* txn); - protected: - bool _stepDown(OperationContext* txn, int secs); - bool _freeze(int secs); - private: - void _assumePrimary(); - void changeState(MemberState s); - - Member* _forceSyncTarget; - - // set of electable members' _ids - set<unsigned> _electableSet; - protected: - // "heartbeat message" - // sent in requestHeartbeat respond in field "hbm" - char _hbmsg[256]; // we change this unlocked, thus not an stl::string - time_t _hbmsgTime; // when it was logged - public: - void sethbmsg(const std::string& s, int logLevel = 0); - - /** - * Election with Priorities - * - * Each node (n) keeps a set of nodes that could be elected primary. - * Each node in this set: - * - * 1. can connect to a majority of the set - * 2. has a priority greater than 0 - * 3. has an optime within 10 seconds of the most up-to-date node - * that n can reach - * - * If a node fails to meet one or more of these criteria, it is removed - * from the list. This list is updated whenever the node receives a - * heartbeat. - * - * When a node sends an "am I freshest?" query, the node receiving the - * query checks their electable list to make sure that no one else is - * electable AND higher priority. If this check passes, the node will - * return an "ok" response, if not, it will veto. - * - * If a node is primary and there is another node with higher priority - * on the electable list (i.e., it must be synced to within 10 seconds - * of the current primary), the node (or nodes) with connections to both - * the primary and the secondary with higher priority will issue - * replSetStepDown requests to the primary to allow the higher-priority - * node to take over. - */ - void addToElectable(const unsigned m) { lock lk(this); _electableSet.insert(m); } - void rmFromElectable(const unsigned m) { lock lk(this); _electableSet.erase(m); } - bool iAmElectable() { - lock lk(this); - return _electableSet.find(_self->id()) != _electableSet.end(); - } - bool isElectable(const unsigned id) { - lock lk(this); - return _electableSet.find(id) != _electableSet.end(); - } - Member* getMostElectable(); - protected: - /** - * Load a new config as the replica set's main config. - * - * If there is a "simple" change (just adding a node), this shortcuts - * the config. Returns true if the config was changed. Returns false - * if the config doesn't include a this node. Throws an exception if - * something goes very wrong. - * - * Behavior to note: - * - locks this - * - intentionally leaks the old _cfg and any old _members (if the - * change isn't strictly additive) - */ - bool initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf = false); - void _fillIsMaster(BSONObjBuilder&); - void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); - const ReplSetConfig& config() { return *_cfg; } - string name() const { return _name; } /* @return replica set's logical name */ - int version() const { return _cfg->version; } /* @return replica set's config version */ - MemberState state() const { return box.getState(); } - void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const; - void _summarizeAsHtml(OperationContext* txn, stringstream&) const; - void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command - - /* call afer constructing to start - returns fairly quickly after launching its threads */ - void _go(); - - private: - string _name; - const vector<HostAndPort> *_seeds; - ReplSetConfig *_cfg; - - /** - * Finds the configuration with the highest version number and attempts - * load it. - */ - bool _loadConfigFinish(OperationContext* txn, vector<ReplSetConfig*>& v); - /** - * Gather all possible configs (from command line seeds, our own config - * doc, and any hosts listed therein) and try to initiate from the most - * recent config we find. - */ - void loadConfig(OperationContext* txn); - - list<HostAndPort> memberHostnames() const; - bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } - bool iAmPotentiallyHot() const { - return myConfig().potentiallyHot() && // not an arbiter - elect.steppedDown <= time(0) && // not stepped down/frozen - state() == MemberState::RS_SECONDARY; // not stale - } - protected: - Member *_self; - bool _buildIndexes; // = _self->config().buildIndexes - - ReplSetImpl(); - /* throws exception if a problem initializing. */ - void init(OperationContext* txn, ReplSetSeedList&); - - void setSelfTo(Member *); // use this as it sets buildIndexes var - private: - List1<Member> _members; // all members of the set EXCEPT _self. - ReplSetConfig::MemberCfg _config; // config of _self - unsigned _id; // _id of _self - - int _maintenanceMode; // if we should stay in recovering state - public: - // this is called from within a writelock in logOpRS - unsigned selfId() const { return _id; } - Manager *mgr; - - private: - Member* head() const { return _members.head(); } - public: - const Member* findById(unsigned id) const; - Member* getMutableMember(unsigned id); - Member* findByName(const std::string& hostname) const; - - // Clears the vetoes (blacklisted sync sources) - void clearVetoes(); - - private: - void _getTargets(list<Target>&, int &configVersion); - void getTargets(list<Target>&, int &configVersion); - void startThreads(); - friend class LegacyReplicationCoordinator; - friend class Member; - friend class Manager; - friend class Consensus; - - private: - bool _initialSyncClone(OperationContext* txn, - Cloner &cloner, - const std::string& host, - const list<string>& dbs, - bool dataPass); - bool _initialSyncApplyOplog(OperationContext* txn, - repl::SyncTail& syncer, - OplogReader* r, - const Member* source); - void _initialSync(); - void syncTail(); - - public: - // keep a list of hosts that we've tried recently that didn't work - map<string,time_t> _veto; - - const ReplSetConfig::MemberCfg& myConfig() const { return _config; } - const OpTime lastOtherOpTime() const; - /** - * The most up to date electable replica - */ - const OpTime lastOtherElectableOpTime() const; - - BSONObj getLastErrorDefault; - }; -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/replication_initializer_impl.cpp b/src/mongo/db/repl/replication_initializer_impl.cpp deleted file mode 100644 index 8e462e6ddee..00000000000 --- a/src/mongo/db/repl/replication_initializer_impl.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/base/init.h" -#include "mongo/base/status.h" -#include "mongo/db/mongod_options.h" -#include "mongo/db/repl/repl_coordinator_external_state_impl.h" -#include "mongo/db/repl/repl_coordinator_impl.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/repl_settings.h" -#include "mongo/db/repl/network_interface_impl.h" -#include "mongo/db/repl/topology_coordinator_impl.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace { - -MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnvironment")) - (InitializerContext* context) { - repl::setGlobalReplicationCoordinator(new repl::ReplicationCoordinatorImpl( - getGlobalReplSettings(), - new repl::ReplicationCoordinatorExternalStateImpl, - new repl::NetworkInterfaceImpl, - new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)), - static_cast<int64_t>(curTimeMillis64()))); - return Status::OK(); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/repl/replication_initializer_legacy.cpp b/src/mongo/db/repl/replication_initializer_legacy.cpp deleted file mode 100644 index 6856ad9c81b..00000000000 --- a/src/mongo/db/repl/replication_initializer_legacy.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/base/init.h" -#include "mongo/db/mongod_options.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/repl_coordinator_legacy.h" -#include "mongo/db/repl/repl_settings.h" - -namespace mongo { -namespace { - -MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, ("SetGlobalEnvironment")) - (InitializerContext* context) { - repl::setGlobalReplicationCoordinator( - new repl::LegacyReplicationCoordinator(getGlobalReplSettings())); - return Status::OK(); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 723f6961f45..88d0d3083e3 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -38,7 +38,6 @@ #include "mongo/db/commands.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/global_environment_experiment.h" -#include "mongo/db/repl/connections.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" @@ -47,10 +46,9 @@ #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_set_seed_list.h" #include "mongo/db/repl/replset_commands.h" -#include "mongo/db/repl/rs_config.h" #include "mongo/db/repl/rslog.h" +#include "mongo/db/repl/scoped_conn.h" #include "mongo/db/repl/update_position_args.h" -#include "mongo/db/repl/write_concern.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/repl/replset_commands.h b/src/mongo/db/repl/replset_commands.h index b015a37ca89..0d05081418f 100644 --- a/src/mongo/db/repl/replset_commands.h +++ b/src/mongo/db/repl/replset_commands.h @@ -32,7 +32,6 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/commands.h" -#include "mongo/db/repl/rs.h" namespace mongo { namespace repl { diff --git a/src/mongo/db/repl/replset_web_handler.cpp b/src/mongo/db/repl/replset_web_handler.cpp deleted file mode 100644 index fb48217bfa2..00000000000 --- a/src/mongo/db/repl/replset_web_handler.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include "mongo/platform/basic.h" - -#include "mongo/db/dbwebserver.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/repl/health.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/util/mongoutils/html.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { -namespace repl { - - using namespace html; - - class ReplSetHandler : public DbWebHandler { - public: - ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ) {} - - virtual bool handles( const string& url ) const { - return startsWith( url , "/_replSet" ); - } - - virtual void handle( OperationContext* txn, - const char *rq, - const std::string& url, - BSONObj params, - string& responseMsg, - int& responseCode, - vector<string>& headers, - const SockAddr &from ) { - - if( url == "/_replSetOplog" ) { - responseMsg = _replSetOplog(params); - } - else - responseMsg = _replSet(txn); - responseCode = 200; - } - - string _replSetOplog(bo parms) { - int _id = (int) str::toUnsigned( parms["_id"].String() ); - - stringstream s; - string t = "Replication oplog"; - s << start(t); - s << p(t); - - if( theReplSet == 0 ) { - if (getGlobalReplicationCoordinator()->getSettings().replSet.empty()) - s << p("Not using --replSet"); - else { - s << p("Still starting up, or else set is not yet " + a("http://dochub.mongodb.org/core/replicasetconfiguration#ReplicaSetConfiguration-InitialSetup", "", "initiated") - + ".<br>" + ReplSet::startupStatusMsg.get()); - } - } - else { - try { - theReplSet->getOplogDiagsAsHtml(_id, s); - } - catch(std::exception& e) { - s << "error querying oplog: " << e.what() << '\n'; - } - } - - s << _end(); - return s.str(); - } - - /* /_replSet show replica set status in html format */ - string _replSet(OperationContext* txn) { - stringstream s; - s << start("Replica Set Status " + prettyHostName()); - s << p( a("/", "back", "Home") + " | " + - a("/local/system.replset/?html=1", "", "View Replset Config") + " | " + - a("/replSetGetStatus?text=1", "", "replSetGetStatus") + " | " + - a("http://dochub.mongodb.org/core/replicasets", "", "Docs") - ); - - if( theReplSet == 0 ) { - if (getGlobalReplicationCoordinator()->getSettings().replSet.empty()) - s << p("Not using --replSet"); - else { - s << p("Still starting up, or else set is not yet " + a("http://dochub.mongodb.org/core/replicasetconfiguration#ReplicaSetConfiguration-InitialSetup", "", "initiated") - + ".<br>" + ReplSet::startupStatusMsg.get()); - } - } - else { - try { - theReplSet->summarizeAsHtml(txn, s); - } - catch(...) { s << "error summarizing replset status\n"; } - } - s << p("Recent replset log activity:"); - fillRsLog(&s); - s << _end(); - return s.str(); - } - - - - } replSetHandler; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/resync.cpp b/src/mongo/db/repl/resync.cpp index a1703987676..fc47875005b 100644 --- a/src/mongo/db/repl/resync.cpp +++ b/src/mongo/db/repl/resync.cpp @@ -30,7 +30,6 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/master_slave.h" // replSettings #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" // replLocalAuth() #include "mongo/db/operation_context.h" namespace mongo { diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp deleted file mode 100644 index 4660e7cb4d1..00000000000 --- a/src/mongo/db/repl/rs.cpp +++ /dev/null @@ -1,133 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/rs.h" - -#include "mongo/base/status.h" -#include "mongo/db/audit.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/client.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/repl_set_impl.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/db/server_parameters.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace repl { - - ReplSet *theReplSet = 0; - - // This is a bitmask with the first bit set. It's used to mark connections that should be kept - // open during stepdowns - const unsigned ScopedConn::keepOpen = 1; - - ReplSet::ReplSet() { - } - - ReplSet* ReplSet::make(OperationContext* txn, ReplSetSeedList& replSetSeedList) { - auto_ptr<ReplSet> ret(new ReplSet()); - ret->init(txn, replSetSeedList); - return ret.release(); - } - - ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; - DiagStr ReplSetImpl::startupStatusMsg; - - void ReplSet::haveNewConfig(OperationContext* txn, ReplSetConfig& newConfig, bool addComment) { - bo comment; - if( addComment ) - comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); - - newConfig.saveConfigLocally(txn, comment); - - try { - BSONObj oldConfForAudit = config().asBson(); - BSONObj newConfForAudit = newConfig.asBson(); - audit::logReplSetReconfig(ClientBasic::getCurrent(), - &oldConfForAudit, - &newConfForAudit); - if (initFromConfig(txn, newConfig, true)) { - log() << "replSet replSetReconfig new config saved locally" << rsLog; - } - } - catch (const DBException& e) { - log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog; - fassertFailedNoTrace(18755); - } - catch (...) { - std::terminate(); - } - } - - void Manager::msgReceivedNewConfig(BSONObj o) { - OperationContextImpl txn; - - log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog; - scoped_ptr<ReplSetConfig> config(ReplSetConfig::make(&txn, o)); - if( config->version > rs->config().version ) - theReplSet->haveNewConfig(&txn, *config, false); - else { - log() << "replSet info msgReceivedNewConfig but version isn't higher " << - config->version << ' ' << rs->config().version << rsLog; - } - } - - /* forked as a thread during startup - it can run quite a while looking for config. but once found, - a separate thread takes over as ReplSetImpl::Manager, and this thread - terminates. - */ - void startReplSets(ReplSetSeedList *replSetSeedList) { - Client::initThread("rsStart"); - OperationContextImpl txn; - - try { - verify( theReplSet == 0 ); - if( replSetSeedList == 0 ) { - return; - } - cc().getAuthorizationSession()->grantInternalAuthorization(); - (theReplSet = ReplSet::make(&txn, *replSetSeedList))->go(); - } - catch(std::exception& e) { - log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; - if( theReplSet ) - fassertFailedNoTrace(18756); - } - cc().shutdown(); - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h deleted file mode 100644 index 09d2561089f..00000000000 --- a/src/mongo/db/repl/rs.h +++ /dev/null @@ -1,76 +0,0 @@ -// /db/repl/rs.h - -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/bson/oid.h" -#include "mongo/bson/optime.h" -#include "mongo/db/commands.h" -#include "mongo/db/index/index_descriptor.h" -#include "mongo/db/repl/consensus.h" -#include "mongo/db/repl/manager.h" -#include "mongo/db/repl/oplogreader.h" -#include "mongo/db/repl/repl_set.h" -#include "mongo/db/repl/repl_set_impl.h" -#include "mongo/db/repl/rs_base.h" -#include "mongo/db/repl/rs_config.h" -#include "mongo/db/repl/rs_exception.h" -#include "mongo/db/repl/rs_sync.h" -#include "mongo/db/repl/server.h" -#include "mongo/db/repl/state_box.h" -#include "mongo/db/repl/sync_source_feedback.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/concurrency/value.h" -#include "mongo/util/net/hostandport.h" - -/** - * Order of Events - * - * On startup, if the --replSet option is present, startReplSets is called. - * startReplSets forks off a new thread for replica set activities. It creates - * the global theReplSet variable and calls go() on it. - * - * theReplSet's constructor changes the replica set's state to RS_STARTUP, - * starts the replica set manager, and loads the config (if the replica set - * has been initialized). - */ - -namespace mongo { -namespace repl { - - extern class ReplSet *theReplSet; // null until initialized - - class ReplSetSeedList; - - // Main entry point for replica sets - void startReplSets(ReplSetSeedList *replSetSeedList); - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp deleted file mode 100644 index c595580159e..00000000000 --- a/src/mongo/db/repl/rs_config.cpp +++ /dev/null @@ -1,736 +0,0 @@ -// rs_config.cpp - -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include <boost/algorithm/string.hpp> - -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/repl/connections.h" -#include "mongo/db/repl/heartbeat.h" -#include "mongo/db/repl/isself.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/repl/rslog.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/util/log.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/text.h" - -namespace mongo { - -namespace repl { - - mongo::mutex ReplSetConfig::groupMx("RS tag group"); - const int ReplSetConfig::DEFAULT_HB_TIMEOUT = 10; - -namespace { - AtomicUInt32 _warnedAboutVotes; - - void assertOnlyHas(BSONObj o, const set<string>& fields) { - BSONObj::iterator i(o); - while( i.more() ) { - BSONElement e = i.next(); - if( !fields.count( e.fieldName() ) ) { - uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "' in object"); - } - } - } -} // namespace - - list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { - list<HostAndPort> L; - for (vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++) { - if (!isSelf(i->h)) { - L.push_back(i->h); - } - } - return L; - } - - /* comment MUST only be set when initiating the set by the initiator */ - void ReplSetConfig::saveConfigLocally(OperationContext* txn, bo comment) { - checkRsConfig(); - - BSONObj newConfigBSON = asBson(); - - log() << "replSet info saving a newer config version to " << rsConfigNs << ": " - << newConfigBSON << rsLog; - { - Lock::DBLock lk(txn->lockState(), NamespaceString(rsConfigNs).db(), MODE_X); - WriteUnitOfWork uow(txn); - - Helpers::putSingletonGod(txn, - rsConfigNs.c_str(), - newConfigBSON, - false/*logOp=false; local db so would work regardless...*/); - if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) ) - logOpInitiate(txn, comment); - uow.commit(); - } - log() << "replSet saveConfigLocally done" << rsLog; - } - - bo ReplSetConfig::MemberCfg::asBson() const { - bob b; - b << "_id" << _id; - b.append("host", h.toString()); - if( votes != 1 ) b << "votes" << votes; - if( priority != 1.0 ) b << "priority" << priority; - if( arbiterOnly ) b << "arbiterOnly" << true; - if( slaveDelay ) b << "slaveDelay" << slaveDelay; - if( hidden ) b << "hidden" << hidden; - if( !buildIndexes ) b << "buildIndexes" << buildIndexes; - if( !tags.empty() ) { - BSONObjBuilder a; - for( map<string,string>::const_iterator i = tags.begin(); i != tags.end(); i++ ) - a.append((*i).first, (*i).second); - b.append("tags", a.done()); - } - return b.obj(); - } - - void ReplSetConfig::updateMembers(List1<Member> &dest) const { - for (vector<MemberCfg>::const_iterator source = members.begin(); source < members.end(); source++) { - for( Member *d = dest.head(); d; d = d->next() ) { - if (d->fullName() == (*source).h.toString()) { - scoped_lock lk(groupMx); - d->configw().groupsw() = (*source).groups(); - } - } - } - } - - bo ReplSetConfig::asBson() const { - bob b; - b.append("_id", _id).append("version", version); - - BSONArrayBuilder a; - for( unsigned i = 0; i < members.size(); i++ ) - a.append( members[i].asBson() ); - b.append("members", a.arr()); - - BSONObjBuilder settings; - bool empty = true; - - if (!rules.empty()) { - bob modes; - for (map<string,TagRule*>::const_iterator it = rules.begin(); it != rules.end(); it++) { - bob clauses; - vector<TagClause*> r = (*it).second->clauses; - for (vector<TagClause*>::iterator it2 = r.begin(); it2 < r.end(); it2++) { - clauses << (*it2)->name << (*it2)->target; - } - modes << (*it).first << clauses.obj(); - } - settings << "getLastErrorModes" << modes.obj(); - empty = false; - } - - if (!getLastErrorDefaults.isEmpty()) { - settings << "getLastErrorDefaults" << getLastErrorDefaults; - empty = false; - } - - if (_heartbeatTimeout != DEFAULT_HB_TIMEOUT) { - settings << "heartbeatTimeoutSecs" << _heartbeatTimeout; - empty = false; - } - - if (!_chainingAllowed) { - settings << "chainingAllowed" << _chainingAllowed; - empty = false; - } - - if (!empty) { - b << "settings" << settings.obj(); - } - - return b.obj(); - } - - static inline void mchk(bool expr) { - uassert(13126, "bad Member config", expr); - } - - void ReplSetConfig::MemberCfg::check() const { - mchk(_id >= 0 && _id <= 255); - mchk(priority >= 0 && priority <= 1000); - mchk(votes <= 100); // votes >= 0 because it is unsigned - uassert(13419, "priorities must be between 0.0 and 1000", priority >= 0.0 && priority <= 1000); - uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0); - uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366); - uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden); - uassert(13477, "priority must be 0 when buildIndexes=false", buildIndexes || priority == 0); - uassert(17492, "arbiter must vote (cannot have 0 votes)", !arbiterOnly || votes > 0); - } - void ReplSetConfig::TagSubgroup::updateLast(const OpTime& op) { - if (last < op) { - last = op; - - for (vector<TagClause*>::iterator it = clauses.begin(); it < clauses.end(); it++) { - (*it)->updateLast(op); - } - } - } - - void ReplSetConfig::TagClause::updateLast(const OpTime& op) { - if (last >= op) { - return; - } - - // check at least n subgroups greater than clause.last - int count = 0; - map<string,TagSubgroup*>::iterator it; - for (it = subgroups.begin(); it != subgroups.end(); it++) { - if ((*it).second->last >= op) { - count++; - } - } - - if (count >= actualTarget) { - last = op; - rule->updateLast(op); - } - } - - void ReplSetConfig::TagRule::updateLast(const OpTime& op) { - OpTime *earliest = (OpTime*)&op; - vector<TagClause*>::iterator it; - - for (it = clauses.begin(); it < clauses.end(); it++) { - if ((*it)->last < *earliest) { - earliest = &(*it)->last; - } - } - - // rules are simply and-ed clauses, so whatever the most-behind - // clause is at is what the rule is at - last = *earliest; - } - - /** @param o old config - @param n new config - */ - /*static*/ - Status ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n) { - verify( theReplSet ); - - if( o._id != n._id ) { - return Status(ErrorCodes::InvalidReplicaSetConfig, "set name may not change"); - } - /* TODO : wonder if we need to allow o.version < n.version only, which is more lenient. - if someone had some intermediate config this node doesnt have, that could be - necessary. but then how did we become primary? so perhaps we are fine as-is. - */ - if( o.version >= n.version ) { - return Status(ErrorCodes::InvalidReplicaSetConfig, - str::stream() << "version number must increase, old: " - << o.version << " new: " << n.version); - } - - map<HostAndPort,const ReplSetConfig::MemberCfg*> old; - bool isLocalHost = false; - for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { - if (i->h.isLocalHost()) { - isLocalHost = true; - } - old[i->h] = &(*i); - } - int me = 0; - for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { - const ReplSetConfig::MemberCfg& m = *i; - if ( (isLocalHost && !m.h.isLocalHost()) || (!isLocalHost && m.h.isLocalHost())) { - log() << "reconfig error, cannot switch between localhost and hostnames: " - << m.h.toString() << rsLog; - uasserted(13645, "hosts cannot switch between localhost and hostname"); - } - if( old.count(m.h) ) { - const ReplSetConfig::MemberCfg& oldCfg = *old[m.h]; - if( oldCfg._id != m._id ) { - log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; - uasserted(13432, "_id may not change for members"); - } - if( oldCfg.buildIndexes != m.buildIndexes ) { - log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; - uasserted(13476, "buildIndexes may not change for members"); - } - /* are transitions to and from arbiterOnly guaranteed safe? if not, we should disallow here. - there is a test at replsets/replsetarb3.js */ - if( oldCfg.arbiterOnly != m.arbiterOnly ) { - log() << "replSet reconfig error with member: " << m.h.toString() << " arbiterOnly cannot change. remove and readd the member instead " << rsLog; - uasserted(13510, "arbiterOnly may not change for members"); - } - } - if (isSelf(m.h)) { - me++; - } - } - - uassert(13433, "can't find self in new replset config", me == 1); - - return Status::OK(); - } - - void ReplSetConfig::clear() { - version = -5; - _ok = false; - } - - void ReplSetConfig::setMajority() { - int total = members.size(); - int nonArbiters = total; - int strictMajority = total/2+1; - - for (vector<MemberCfg>::iterator it = members.begin(); it < members.end(); it++) { - if ((*it).arbiterOnly) { - nonArbiters--; - } - } - - // majority should be all "normal" members if we have something like 4 - // arbiters & 3 normal members - _majority = (strictMajority > nonArbiters) ? nonArbiters : strictMajority; - } - - int ReplSetConfig::getMajority() const { - return _majority; - } - - void ReplSetConfig::checkRsConfig() const { - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - uassert(13132, - str::stream() << "nonmatching repl set name in _id field: " << _id << " vs. " - << replSettings.ourSetName(), - _id == replSettings.ourSetName()); - uassert(13308, "replSet bad config version #", version > 0); - uassert(13133, "replSet bad config no members", members.size() >= 1); - uassert(13309, "replSet bad config maximum number of members is 12", members.size() <= 12); - if (!getLastErrorDefaults.isEmpty() && getLastErrorDefaults.hasField("w") - && getLastErrorDefaults["w"].isNumber()) { - uassert(17505, "replSet illegal config: getLastErrorDefaults w:0", - getLastErrorDefaults["w"].safeNumberLong() != 0); - } - { - unsigned voters = 0; - for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); ++i ) { - if( i->votes ) - voters++; - } - uassert(13612, "replSet bad config maximum number of voting members is 7", voters <= 7); - uassert(13613, "replSet bad config no voting members", voters > 0); - } - } - - void ReplSetConfig::_populateTagMap(map<string,TagClause> &tagMap) { - // create subgroups for each server corresponding to each of - // its tags. E.g.: - // - // A is tagged with {"server" : "A", "dc" : "ny"} - // B is tagged with {"server" : "B", "dc" : "ny"} - // - // At the end of this step, tagMap will contain: - // - // "server" => {"A" : [A], "B" : [B]} - // "dc" => {"ny" : [A,B]} - - for (unsigned i=0; i<members.size(); i++) { - MemberCfg member = members[i]; - - for (map<string,string>::iterator tag = member.tags.begin(); tag != member.tags.end(); tag++) { - string label = (*tag).first; - string value = (*tag).second; - - TagClause& clause = tagMap[label]; - clause.name = label; - - TagSubgroup* subgroup; - // search for "ny" in "dc"'s clause - if (clause.subgroups.find(value) == clause.subgroups.end()) { - clause.subgroups[value] = subgroup = new TagSubgroup(value); - } - else { - subgroup = clause.subgroups[value]; - } - - subgroup->m.insert(&members[i]); - } - } - } - - void ReplSetConfig::parseRules(const BSONObj& modes) { - map<string,TagClause> tagMap; - _populateTagMap(tagMap); - - for (BSONObj::iterator i = modes.begin(); i.more(); ) { - unsigned int primaryOnly = 0; - - // ruleName : {dc : 2, m : 3} - BSONElement rule = i.next(); - uassert(14046, "getLastErrorMode rules must be objects", rule.type() == mongo::Object); - - TagRule* r = new TagRule(); - - BSONObj clauseObj = rule.Obj(); - for (BSONObj::iterator c = clauseObj.begin(); c.more(); ) { - BSONElement clauseElem = c.next(); - uassert(14829, "getLastErrorMode criteria must be numeric", clauseElem.isNumber()); - - // get the clause, e.g., "x.y" : 3 - const char *criteria = clauseElem.fieldName(); - int value = clauseElem.numberInt(); - uassert(14828, str::stream() << "getLastErrorMode criteria must be greater than 0: " << clauseElem, value > 0); - - TagClause* node = new TagClause(tagMap[criteria]); - - int numGroups = node->subgroups.size(); - uassert(14831, str::stream() << "mode " << clauseObj << " requires " - << value << " tagged with " << criteria << ", but only " - << numGroups << " with this tag were found", numGroups >= value); - - node->name = criteria; - node->target = value; - // if any subgroups contain "me", we can decrease the target - node->actualTarget = node->target; - - // then we want to add pointers between clause & subgroup - for (map<string,TagSubgroup*>::iterator sgs = node->subgroups.begin(); - sgs != node->subgroups.end(); sgs++) { - bool foundMe = false; - (*sgs).second->clauses.push_back(node); - - // if this subgroup contains the primary, it's automatically always up-to-date - for( set<MemberCfg*>::const_iterator cfg = (*sgs).second->m.begin(); - cfg != (*sgs).second->m.end(); - cfg++) - { - if (isSelf((*cfg)->h)) { - node->actualTarget--; - foundMe = true; - } - } - - scoped_lock lk(groupMx); - for (set<MemberCfg *>::iterator cfg = (*sgs).second->m.begin(); - !foundMe && cfg != (*sgs).second->m.end(); cfg++) { - (*cfg)->groupsw().insert((*sgs).second); - } - } - - // if all of the members of this clause involve the primary, it's always up-to-date - if (node->actualTarget == 0) { - node->last = OpTime(INT_MAX, INT_MAX); - primaryOnly++; - } - - // this is a valid clause, so we want to add it to its rule - node->rule = r; - r->clauses.push_back(node); - } - - // if all of the clauses are satisfied by the primary, this rule is trivially true - if (primaryOnly == r->clauses.size()) { - r->last = OpTime(INT_MAX, INT_MAX); - } - - // if we got here, this is a valid rule - rules[rule.fieldName()] = r; - } - } - - void ReplSetConfig::from(BSONObj o) { - static const string legal[] = {"_id","version", "members","settings"}; - static const set<string> legals(legal, legal + 4); - assertOnlyHas(o, legals); - - _id = o["_id"].String(); - if( o["version"].ok() ) { - version = o["version"].numberInt(); - uassert(13115, "bad " + rsConfigNs + " config: version", version > 0); - } - - set<string> hosts; - set<int> ords; - vector<BSONElement> members; - try { - members = o["members"].Array(); - } - catch(...) { - uasserted(13131, "replSet error parsing (or missing) 'members' field in config object"); - } - - unsigned localhosts = 0; - for( unsigned i = 0; i < members.size(); i++ ) { - BSONObj mobj = members[i].Obj(); - MemberCfg m; - try { - static const string legal[] = { - "_id","votes","priority","host", "hidden","slaveDelay", - "arbiterOnly","buildIndexes","tags","initialSync" // deprecated - }; - static const set<string> legals(legal, legal + 10); - assertOnlyHas(mobj, legals); - - uassert(18519, "_id must be numeric", mobj["_id"].isNumber()); - m._id = mobj["_id"].numberInt(); - - try { - string s = mobj["host"].String(); - boost::trim(s); - m.h = HostAndPort(s); - if ( !m.h.hasPort() ) { - // make port explicit even if default - m.h = HostAndPort(m.h.host(), m.h.port()); - } - } - catch (const DBException& e) { - uasserted(18520, - mongoutils::str::stream() << - "bad or missing host field in member config object " << - mobj.toString() << causedBy(e)); - } - if( m.h.isLocalHost() ) - localhosts++; - m.arbiterOnly = mobj["arbiterOnly"].trueValue(); - m.slaveDelay = mobj["slaveDelay"].numberInt(); - if( mobj.hasElement("hidden") ) - m.hidden = mobj["hidden"].trueValue(); - if( mobj.hasElement("buildIndexes") ) - m.buildIndexes = mobj["buildIndexes"].trueValue(); - if( mobj.hasElement("priority") ) - m.priority = mobj["priority"].Number(); - if( mobj.hasElement("votes") ) - m.votes = (unsigned) mobj["votes"].Number(); - if (m.votes > 1 && (_warnedAboutVotes.load() == 0)) { - log() << "\t\tWARNING: Having more than 1 vote on a single replicaset member is" - << startupWarningsLog; - log() << "\t\tdeprecated, as it causes issues with majority write concern. For" - << startupWarningsLog; - log() << "\t\tmore information, see " - << "http://dochub.mongodb.org/core/replica-set-votes-deprecated" - << startupWarningsLog; - _warnedAboutVotes.store(1); - } - if( mobj.hasElement("tags") ) { - const BSONObj &t = mobj["tags"].Obj(); - for (BSONObj::iterator c = t.begin(); c.more(); c.next()) { - m.tags[(*c).fieldName()] = (*c).String(); - } - uassert(14827, "arbiters cannot have tags", !m.arbiterOnly || m.tags.empty() ); - } - m.check(); - } - catch( const char * p ) { - log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog; - stringstream ss; - ss << "replSet members[" << i << "] " << p; - uassert(13107, ss.str(), false); - } - catch(DBException& e) { - log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog; - stringstream ss; - ss << "bad config for member[" << i << "] " << e.what(); - uassert(13135, ss.str(), false); - } - if( !(ords.count(m._id) == 0 && hosts.count(m.h.toString()) == 0) ) { - log() << "replSet " << o.toString() << rsLog; - uassert(13108, "bad replset config -- duplicate hosts in the config object?", false); - } - hosts.insert(m.h.toString()); - ords.insert(m._id); - this->members.push_back(m); - } - uassert(13393, "can't use localhost in repl set member names except when using it for all members", localhosts == 0 || localhosts == members.size()); - uassert(13117, "bad " + rsConfigNs + " config", !_id.empty()); - - if( o["settings"].ok() ) { - BSONObj settings = o["settings"].Obj(); - if( settings["getLastErrorModes"].ok() ) { - parseRules(settings["getLastErrorModes"].Obj()); - } - try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } - catch(...) { } - - if (settings.hasField("heartbeatTimeoutSecs")) { - int timeout = settings["heartbeatTimeoutSecs"].numberInt(); - uassert(16438, "Heartbeat timeout must be non-negative", timeout >= 0); - _heartbeatTimeout = timeout; - } - - // If the config explicitly sets chaining to false, turn it off. - if (settings.hasField("chainingAllowed") && - !settings["chainingAllowed"].trueValue()) { - _chainingAllowed = false; - } - } - - // figure out the majority for this config - setMajority(); - } - - bool ReplSetConfig::chainingAllowed() const { - return _chainingAllowed; - } - - int ReplSetConfig::getHeartbeatTimeout() const { - return _heartbeatTimeout; - } - - ReplSetConfig::ReplSetConfig() : - version(EMPTYCONFIG), - _chainingAllowed(true), - _majority(-1), - _ok(false), - _heartbeatTimeout(DEFAULT_HB_TIMEOUT) { - } - - ReplSetConfig* ReplSetConfig::make(OperationContext* txn, BSONObj cfg, bool force) { - auto_ptr<ReplSetConfig> ret(new ReplSetConfig()); - ret->init(txn, cfg, force); - return ret.release(); - } - - void ReplSetConfig::init(OperationContext* txn, BSONObj cfg, bool force) { - clear(); - from(cfg); - if( force ) { - version += rand() % 100000 + 10000; - } - uassert(13122, "bad repl set config?", version < 0 /*unspecified*/ || (version >= 1) ); - if( version < 1 ) - version = 1; - _ok = true; - } - - ReplSetConfig* ReplSetConfig::make(OperationContext* txn, const HostAndPort& h) { - auto_ptr<ReplSetConfig> ret(new ReplSetConfig()); - ret->init(txn, h); - return ret.release(); - } - - ReplSetConfig* ReplSetConfig::makeDirect(OperationContext* txn) { - DBDirectClient cli(txn); - BSONObj config = cli.findOne(rsConfigNs, Query()).getOwned(); - - // Check for no local config - if (config.isEmpty()) { - return new ReplSetConfig(); - } - - return make(txn, config, false); - } - - void ReplSetConfig::init(OperationContext* txn, const HostAndPort& h) { - LOG(2) << "ReplSetConfig load " << h.toString() << rsLog; - - clear(); - int level = 2; - DEV level = 0; - - BSONObj cfg; - int v = -5; - try { - if (isSelf(h)) { - ; - } - else { - /* first, make sure other node is configured to be a replset. just to be safe. */ - string setname = getGlobalReplicationCoordinator()->getSettings().ourSetName(); - BSONObj cmd = BSON( "replSetHeartbeat" << setname ); - int theirVersion; - BSONObj info; - log() << "trying to contact " << h.toString() << rsLog; - bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion); - if( info["rs"].trueValue() ) { - // yes, it is a replicate set, although perhaps not yet initialized - } - else { - if( !ok ) { - log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog; - if( !info.isEmpty() ) - log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog; - return; - } - { - stringstream ss; - ss << "replSet error: member " << h.toString() << " is not in --replSet mode"; - msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught - //for python err# checker: uassert(13260, "", false); - } - } - } - - v = -4; - unsigned long long count = 0; - try { - ScopedConn conn(h.toString()); - v = -3; - cfg = conn.findOne(rsConfigNs, Query()).getOwned(); - count = conn.count(rsConfigNs); - } - catch ( DBException& ) { - if (!isSelf(h)) { - throw; - } - - // on startup, socket is not listening yet - DBDirectClient cli(txn); - cfg = cli.findOne( rsConfigNs, Query() ).getOwned(); - count = cli.count(rsConfigNs); - } - - if( count > 1 ) - uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString()); - - if( cfg.isEmpty() ) { - version = EMPTYCONFIG; - return; - } - version = -1; - } - catch( DBException& e) { - version = v; - LOG(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog; - return; - } - - from(cfg); - checkRsConfig(); - _ok = true; - LOG(level) << "replSet load config ok from " << (isSelf(h) ? "self" : h.toString()) << rsLog; - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/rs_config.h b/src/mongo/db/repl/rs_config.h deleted file mode 100644 index 3e33277dd8a..00000000000 --- a/src/mongo/db/repl/rs_config.h +++ /dev/null @@ -1,321 +0,0 @@ -// rs_config.h -// repl set configuration -// - -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/jsobj.h" -#include "mongo/util/concurrency/list.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { - - class OperationContext; - -namespace repl { - class Member; - const std::string rsConfigNs = "local.system.replset"; - - class ReplSetConfig { - enum { EMPTYCONFIG = -2 }; - struct TagSubgroup; - - // Protects _groups. - static mongo::mutex groupMx; - public: - /** - * This contacts the given host and tries to get a config from them. - * - * This sends a test heartbeat to the host and, if all goes well and the - * host has a more recent config, fetches the config and loads it (see - * from(). - * - * If it's contacting itself, it skips the heartbeat (for obvious - * reasons.) If something is misconfigured, throws an exception. If the - * host couldn't be queried or is just blank, ok() will be false. - */ - static ReplSetConfig* make(OperationContext* txn, const HostAndPort& h); - - static ReplSetConfig* make(OperationContext* txn, BSONObj cfg, bool force=false); - - /** - * This uses DBDirectClient to check itself for a config. This way we don't need to connect - * to ourselves over the network to fetch our own config. - */ - static ReplSetConfig* makeDirect(OperationContext* txn); - - bool ok() const { return _ok; } - - struct TagRule; - - struct MemberCfg { - MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false), buildIndexes(true) { } - int _id; /* ordinal */ - unsigned votes; /* how many votes this node gets. default 1. */ - HostAndPort h; - double priority; /* 0 means can never be primary */ - bool arbiterOnly; - int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */ - bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */ - bool buildIndexes; /* if false, do not create any non-_id indexes */ - std::map<std::string,std::string> tags; /* tagging for data center, rack, etc. */ - private: - std::set<TagSubgroup*> _groups; // the subgroups this member belongs to - public: - const std::set<TagSubgroup*>& groups() const { - return _groups; - } - std::set<TagSubgroup*>& groupsw() { - return _groups; - } - void check() const; /* check validity, assert if not. */ - BSONObj asBson() const; - bool potentiallyHot() const { return !arbiterOnly && priority > 0; } - void updateGroups(const OpTime& last) { - scoped_lock lk(ReplSetConfig::groupMx); - for (std::set<TagSubgroup*>::const_iterator it = groups().begin(); it != groups().end(); it++) { - (*it)->updateLast(last); - } - } - bool isSameIgnoringTags(const MemberCfg& r) const { - return _id == r._id && votes == r.votes && h == r.h && priority == r.priority && - arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && - hidden == r.hidden && buildIndexes == r.buildIndexes; - } - bool operator==(const MemberCfg& r) const { - if (!tags.empty() || !r.tags.empty()) { - if (tags.size() != r.tags.size()) { - return false; - } - - // if they are the same size and not equal, at least one - // element in A must be different in B - for (std::map<std::string,std::string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) { - std::map<std::string,std::string>::const_iterator rit = r.tags.find((*lit).first); - - if (rit == r.tags.end() || (*lit).second != (*rit).second) { - return false; - } - } - } - - return isSameIgnoringTags(r); - } - bool operator!=(const MemberCfg& r) const { return !(*this == r); } - }; - - std::vector<MemberCfg> members; - std::string _id; - int version; - - BSONObj getLastErrorDefaults; - std::map<std::string,TagRule*> rules; - - std::list<HostAndPort> otherMemberHostnames() const; // except self - - /** @return true if could connect, and there is no cfg object there at all */ - bool empty() const { return version == EMPTYCONFIG; } - - std::string toString() const { return asBson().toString(); } - - /** validate the settings. does not call check() on each member, you have to do that separately. */ - void checkRsConfig() const; - - /** check if modification makes sense */ - static Status legalChange(const ReplSetConfig& old, const ReplSetConfig& n); - - /** - * 1. Checks the validity of member variables. (may uassert) - * 2. Saves a BSON copy of the config to local.system.replset - * 3. If 'comment' isn't empty and we're a primary or not yet initiated, log an 'n' op - * to the oplog. This is important because it establishes our lastOpWritten time. - */ - void saveConfigLocally(OperationContext* txn, BSONObj comment); // to local db - - /** - * Update members' groups when the config changes but members stay the same. - */ - void updateMembers(List1<Member> &dest) const; - - BSONObj asBson() const; - - /** - * Getter and setter for _majority. This is almost always - * members.size()/2+1, but can be the number of non-arbiter members if - * there are more arbiters than non-arbiters (writing to 3 out of 7 - * servers is safe if 4 of the servers are arbiters). - */ - private: - void setMajority(); - public: - int getMajority() const; - - /** - * Get the timeout to use for heartbeats. - */ - int getHeartbeatTimeout() const; - - /** - * Default timeout: 10 seconds - */ - static const int DEFAULT_HB_TIMEOUT; - - /** - * Returns if replication chaining is allowed. - */ - bool chainingAllowed() const; - - private: - ReplSetConfig(); - void init(OperationContext* txn, const HostAndPort& h); - void init(OperationContext* txn, BSONObj cfg, bool force); - - /** - * If replication can be chained. If chaining is disallowed, it can still be explicitly - * enabled via the replSetSyncFrom command, but it will not happen automatically. - */ - bool _chainingAllowed; - int _majority; - bool _ok; - - /** - * This function takes a config BSON and converts it into actual Member objects - * with the proper config filled in. It then pushes all the new Members onto - * the member variable "members". It also does some config checks and might uassert. - */ - void from(BSONObj); - - /** - * Just sets "version" to -5 and "_ok" to false. - */ - void clear(); - - /** - * The timeout to use for heartbeats - */ - int _heartbeatTimeout; - - struct TagClause; - /** - * This is a logical grouping of servers. It is pointed to by a set of - * servers with a certain tag. - * - * For example, suppose servers A, B, and C have the tag "dc" : "nyc". If we - * have a rule {"dc" : 2}, then we want A _or_ B _or_ C to have the - * write for one of the "dc" critiria to be fulfilled, so all three will - * point to this subgroup. When one of their oplog-tailing cursors is - * updated, this subgroup is updated. - */ - struct TagSubgroup : boost::noncopyable { - ~TagSubgroup(); // never called; not defined - TagSubgroup(const std::string& nm) : name(nm) { } - const std::string name; - OpTime last; - std::vector<TagClause*> clauses; - - // this probably won't actually point to valid members after the - // subgroup is created, as initFromConfig() makes a copy of the - // config - std::set<MemberCfg*> m; - - void updateLast(const OpTime& op); - - //string toString() const; - - /** - * If two tags have the same name, they should compare as equal so - * that members don't have to update two identical groups on writes. - */ - bool operator() (TagSubgroup& lhs, TagSubgroup& rhs) const { - return lhs.name < rhs.name; - } - }; - - /** - * An argument in a rule. For example, if we had the rule {dc : 2, - * machines : 3}, "dc" : 2 and "machines" : 3 would be two TagClauses. - * - * Each tag clause has a set of associated subgroups. For example, if - * we had "dc" : 2, our subgroups might be "nyc", "sf", and "hk". - */ - struct TagClause { - OpTime last; - std::map<std::string,TagSubgroup*> subgroups; - TagRule *rule; - std::string name; - /** - * If we have get a clause like {machines : 3} and this server is - * tagged with "machines", then it's really {machines : 2}, as we - * will always be up-to-date. So, target would be 3 and - * actualTarget would be 2, in that example. - */ - int target; - int actualTarget; - - void updateLast(const OpTime& op); - std::string toString() const; - }; - - /** - * Parses getLastErrorModes. - */ - void parseRules(const BSONObj& modes); - - /** - * Create a hash containing every possible clause that could be used in a - * rule and the servers related to that clause. - * - * For example, suppose we have the following servers: - * A {"dc" : "ny", "ny" : "rk1"} - * B {"dc" : "ny", "ny" : "rk1"} - * C {"dc" : "ny", "ny" : "rk2"} - * D {"dc" : "sf", "sf" : "rk1"} - * E {"dc" : "sf", "sf" : "rk2"} - * - * This would give us the possible criteria: - * "dc" -> {A, B, C},{D, E} - * "ny" -> {A, B},{C} - * "sf" -> {D},{E} - */ - void _populateTagMap(std::map<std::string,TagClause> &tagMap); - - public: - struct TagRule { - std::vector<TagClause*> clauses; - OpTime last; - - void updateLast(const OpTime& op); - std::string toString() const; - }; - }; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index cb454347ba3..33a0a296075 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -45,12 +45,10 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/member.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/repl/rs_initialsync.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/sync_tail.h" diff --git a/src/mongo/db/repl/connections.cpp b/src/mongo/db/repl/scoped_conn.cpp index 6abcc60d0b6..cd81fc24d1b 100644 --- a/src/mongo/db/repl/connections.cpp +++ b/src/mongo/db/repl/scoped_conn.cpp @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/connections.h" +#include "mongo/db/repl/scoped_conn.h" #include "mongo/db/repl/rslog.h" #include "mongo/util/log.h" @@ -40,6 +40,22 @@ namespace mongo { namespace repl { + static const int DEFAULT_HEARTBEAT_TIMEOUT_SECS = 10; + + // This is a bitmask with the first bit set. It's used to mark connections that should be kept + // open during stepdowns + const unsigned ScopedConn::keepOpen = 1; + ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); + mutex ScopedConn::mapMutex("ScopedConn::mapMutex"); + + ScopedConn::ConnectionInfo::ConnectionInfo() : lock("ConnectionInfo"), + cc(new DBClientConnection(/*reconnect*/ true, + /*replicaSet*/ 0, + /*timeout*/ DEFAULT_HEARTBEAT_TIMEOUT_SECS)), + connected(false) { + cc->_logLevel = logger::LogSeverity::Debug(2); + } + // we should already be locked... bool ScopedConn::connect() { std::string err; diff --git a/src/mongo/db/repl/connections.h b/src/mongo/db/repl/scoped_conn.h index dec96be2215..d8bed889d6d 100644 --- a/src/mongo/db/repl/connections.h +++ b/src/mongo/db/repl/scoped_conn.h @@ -36,8 +36,6 @@ #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/internal_user_auth.h" #include "mongo/db/auth/security_key.h" -#include "mongo/db/repl/health.h" -#include "mongo/db/repl/rs_config.h" namespace mongo { namespace repl { @@ -100,13 +98,7 @@ namespace repl { mongo::mutex lock; scoped_ptr<DBClientConnection> cc; bool connected; - ConnectionInfo() : lock("ConnectionInfo"), - cc(new DBClientConnection(/*reconnect*/ true, - /*replicaSet*/ 0, - /*timeout*/ ReplSetConfig::DEFAULT_HB_TIMEOUT)), - connected(false) { - cc->_logLevel = logger::LogSeverity::Debug(2); - } + ConnectionInfo(); void tagPort() { MessagingPort& mp = cc->port(); diff --git a/src/mongo/db/repl/state_box.cpp b/src/mongo/db/repl/state_box.cpp deleted file mode 100644 index 0b24c1cc582..00000000000 --- a/src/mongo/db/repl/state_box.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/state_box.h" - -#include "mongo/db/repl/rslog.h" -#include "mongo/util/log.h" - -namespace mongo { - -namespace repl { - - void StateBox::change(MemberState s, const Member *self) { - rwlock lk(m, true); - if( sp.state != s ) { - log() << "replSet " << s.toString() << rsLog; - } - sp.state = s; - if( s.primary() ) { - sp.primary = self; - } - else { - if( self == sp.primary ) - sp.primary = 0; - } - } - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/state_box.h b/src/mongo/db/repl/state_box.h deleted file mode 100644 index 1a889ee3035..00000000000 --- a/src/mongo/db/repl/state_box.h +++ /dev/null @@ -1,83 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/repl/health.h" -#include "mongo/db/repl/member_state.h" -#include "mongo/util/concurrency/rwlock.h" - -namespace mongo { -namespace repl { - - class Member; - - /* safe container for our state that keeps member pointer and state variables always aligned */ - class StateBox : boost::noncopyable { - public: - struct SP { // SP is like pair<MemberState,const Member *> but nicer - SP() : state(MemberState::RS_STARTUP), primary(0) { } - MemberState state; - const Member *primary; - }; - const SP get() { - rwlock lk(m, false); - return sp; - } - MemberState getState() const { - rwlock lk(m, false); - return sp.state; - } - const Member* getPrimary() const { - rwlock lk(m, false); - return sp.primary; - } - void change(MemberState s, const Member *self); - void set(MemberState s, const Member *p) { - rwlock lk(m, true); - sp.state = s; - sp.primary = p; - } - void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); } - void setOtherPrimary(const Member *mem) { - rwlock lk(m, true); - verify( !sp.state.primary() ); - sp.primary = mem; - } - void noteRemoteIsPrimary(const Member *remote) { - rwlock lk(m, true); - verify(!sp.state.primary()); - sp.primary = remote; - } - StateBox() : m("StateBox") { } - private: - RWLock m; - SP sp; - }; -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index ca15023b97e..71c1c6f0485 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -42,7 +42,6 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/member.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/operation_context.h" diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ef53099dd7c..b779fb6e16b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/sync_tail.h" #include <boost/functional/hash.hpp> +#include <boost/ref.hpp> #include "third_party/murmurhash3/MurmurHash3.h" #include "mongo/base/counter.h" @@ -50,7 +51,6 @@ #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/db/operation_context_impl.h" @@ -535,26 +535,6 @@ namespace { // we have to check this before calling mgr, as we must be a secondary to // become primary tryToGoLiveAsASecondary(&txn, replCoord); - - // TODO(emilkie): This can be removed once we switch over from legacy; - // this code is what moves 1-node sets to PRIMARY state. - // normally msgCheckNewState gets called periodically, but in a single node - // replset there are no heartbeat threads, so we do it here to be sure. this is - // relevant if the singleton member has done a stepDown() and needs to come back - // up. - if (theReplSet && - theReplSet->config().members.size() == 1 && - theReplSet->myConfig().potentiallyHot()) { - Manager* mgr = theReplSet->mgr; - // When would mgr be null? During replsettest'ing, in which case we should - // fall through and actually apply ops as if we were a real secondary. - if (mgr) { - mgr->send(stdx::bind(&Manager::msgCheckNewState, theReplSet->mgr)); - sleepsecs(1); - // There should never be ops to sync in a 1-member set, anyway - return; - } - } } const int slaveDelaySecs = replCoord->getSlaveDelaySecs().total_seconds(); @@ -594,16 +574,6 @@ namespace { OpTime minValid = lastOp["ts"]._opTime(); setMinValid(&txn, minValid); multiApply(ops.getDeque()); - - // If we're just testing (no manager), don't keep looping if we exhausted the bgqueue - // TODO(spencer): Remove repltest.cpp dbtest or make this work with the new replication - // coordinator - if (theReplSet && !theReplSet->mgr) { - BSONObj op; - if (!peek(&op)) { - return; - } - } } } diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp deleted file mode 100644 index 8b9174c1271..00000000000 --- a/src/mongo/db/repl/write_concern.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/write_concern.h" - -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/auth/user_name.h" -#include "mongo/db/client.h" -#include "mongo/db/commands/fsync.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/instance.h" -#include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" - -//#define REPLDEBUG(x) log() << "replBlock: " << x << endl; -#define REPLDEBUG(x) - -namespace mongo { -namespace repl { - - using namespace mongoutils; - - class SlaveTracking { // SERVER-4328 todo review - public: - - struct Ident { - - Ident(const BSONObj& r, const BSONObj& config) { - BSONObjBuilder b; - b.appendElements( r ); - b.append( "config" , config ); - obj = b.obj(); - } - - bool operator<( const Ident& other ) const { - return obj["_id"].OID() < other.obj["_id"].OID(); - } - - BSONObj obj; - }; - - SlaveTracking() : _mutex("SlaveTracking") { - } - - void reset() { - scoped_lock mylk(_mutex); - _slaves.clear(); - } - - bool update(const BSONObj& rid, const BSONObj config, OpTime last) { - REPLDEBUG( config << " " << rid << " " << ns << " " << last ); - - Ident ident(rid, config); - - scoped_lock mylk(_mutex); - - if (last > _slaves[ident]) { - _slaves[ident] = last; - - // update write concern tags if this node is primary - if (theReplSet && theReplSet->isPrimary()) { - const Member* mem = theReplSet->findById(ident.obj["config"]["_id"].Int()); - if (!mem) { - return false; - } - ReplSetConfig::MemberCfg cfg = mem->config(); - cfg.updateGroups(last); - } - - _threadsWaitingForReplication.notify_all(); - } - return true; - } - - bool opReplicatedEnough( OpTime op , BSONElement w ) { - RARELY { - REPLDEBUG( "looking for : " << op << " w=" << w ); - } - - if (w.isNumber()) { - return replicatedToNum(op, w.numberInt()); - } - - uassert( 16250 , "w has to be a string or a number" , w.type() == String ); - - if (!theReplSet) { - return false; - } - - return opReplicatedEnough( op, w.String() ); - } - - bool opReplicatedEnough( OpTime op , const string& wStr ) { - if (wStr == "majority") { - // use the entire set, including arbiters, to prevent writing - // to a majority of the set but not a majority of voters - return replicatedToNum(op, theReplSet->config().getMajority()); - } - - map<string,ReplSetConfig::TagRule*>::const_iterator it = theReplSet->config().rules.find(wStr); - uassert(ErrorCodes::UnknownReplWriteConcern, - str::stream() << "unrecognized getLastError mode: " << wStr, - it != theReplSet->config().rules.end()); - - return op <= (*it).second->last; - } - - bool replicatedToNum(OpTime& op, int w) { - massert(ErrorCodes::NotMaster, - "replicatedToNum called but not master anymore", - getGlobalReplicationCoordinator()->getReplicationMode() != - ReplicationCoordinator::modeReplSet || - getGlobalReplicationCoordinator()->getCurrentMemberState().primary()); - - if ( w <= 1 ) - return true; - - w--; // now this is the # of slaves i need - scoped_lock mylk(_mutex); - return _replicatedToNum_slaves_locked( op, w ); - } - - bool waitForReplication(OpTime& op, int w, int maxSecondsToWait) { - static const int noLongerMasterAssertCode = ErrorCodes::NotMaster; - massert(noLongerMasterAssertCode, - "waitForReplication called but not master anymore", - getGlobalReplicationCoordinator()->getReplicationMode() != - ReplicationCoordinator::modeReplSet || - getGlobalReplicationCoordinator()->getCurrentMemberState().primary()); - - if ( w <= 1 ) - return true; - - w--; // now this is the # of slaves i need - - boost::xtime xt; - boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC); - xt.sec += maxSecondsToWait; - - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - scoped_lock mylk(_mutex); - while ( ! _replicatedToNum_slaves_locked( op, w ) ) { - if ( ! _threadsWaitingForReplication.timed_wait( mylk.boost() , xt ) ) { - massert(noLongerMasterAssertCode, - "waitForReplication called but not master anymore", - replCoord->getReplicationMode() != ReplicationCoordinator::modeReplSet - || replCoord->getCurrentMemberState().primary()); - return false; - } - massert(noLongerMasterAssertCode, - "waitForReplication called but not master anymore", - replCoord->getReplicationMode() != ReplicationCoordinator::modeReplSet - || replCoord->getCurrentMemberState().primary()); - } - return true; - } - - bool _replicatedToNum_slaves_locked(OpTime& op, int numSlaves ) { - for ( map<Ident,OpTime>::iterator i=_slaves.begin(); i!=_slaves.end(); i++) { - OpTime s = i->second; - if ( s < op ) { - continue; - } - if ( --numSlaves == 0 ) - return true; - } - return numSlaves <= 0; - } - - std::vector<BSONObj> getHostsAtOp(const OpTime& op) { - std::vector<BSONObj> result; - if (theReplSet) { - result.push_back(theReplSet->myConfig().asBson()); - } - - scoped_lock mylk(_mutex); - for (map<Ident,OpTime>::iterator i = _slaves.begin(); i != _slaves.end(); i++) { - OpTime replicatedTo = i->second; - if (replicatedTo >= op) { - result.push_back(i->first.obj["config"].Obj()); - } - } - - return result; - } - - unsigned getSlaveCount() const { - scoped_lock mylk(_mutex); - - return _slaves.size(); - } - - // need to be careful not to deadlock with this - mutable mongo::mutex _mutex; - boost::condition _threadsWaitingForReplication; - - map<Ident,OpTime> _slaves; - - } slaveTracking; - - bool updateSlaveTracking(const BSONObj& rid, - const BSONObj config, - OpTime last) { - return slaveTracking.update(rid, config, last); - } - - bool opReplicatedEnough( OpTime op , BSONElement w ) { - return slaveTracking.opReplicatedEnough( op , w ); - } - - bool opReplicatedEnough( OpTime op , int w ) { - return slaveTracking.replicatedToNum( op , w ); - } - - bool opReplicatedEnough( OpTime op , const string& w ) { - return slaveTracking.opReplicatedEnough( op , w ); - } - - bool waitForReplication( OpTime op , int w , int maxSecondsToWait ) { - return slaveTracking.waitForReplication( op, w, maxSecondsToWait ); - } - - vector<BSONObj> getHostsWrittenTo( const OpTime& op ) { - return slaveTracking.getHostsAtOp(op); - } - - void resetSlaveCache() { - slaveTracking.reset(); - } - - unsigned getSlaveCount() { - return slaveTracking.getSlaveCount(); - } -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h deleted file mode 100644 index 2ac3f23ab1a..00000000000 --- a/src/mongo/db/repl/write_concern.h +++ /dev/null @@ -1,66 +0,0 @@ -/** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include <vector> - -#include "mongo/bson/optime.h" -#include "mongo/db/jsobj.h" - -/** - local.slaves - current location for all slaves - - */ -namespace mongo { - class CurOp; - -namespace repl { - - /** - * This updates the slave tracking map and updates the tag groups. - * - * @returns false when the member cannot be found - */ - bool updateSlaveTracking(const BSONObj& rid, - const BSONObj config, - OpTime last); - - /** @return true if op has made it to w servers */ - bool opReplicatedEnough( OpTime op , int w ); - bool opReplicatedEnough( OpTime op , const std::string& w ); - bool opReplicatedEnough( OpTime op , BSONElement w ); - - bool waitForReplication( OpTime op , int w , int maxSecondsToWait ); - - std::vector<BSONObj> getHostsWrittenTo( const OpTime& op ); - - void resetSlaveCache(); - unsigned getSlaveCount(); -} // namespace repl -} // namespace mongo |