/** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/base/owned_pointer_vector.h" #include "mongo/base/status.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/cmdline.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/instance.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/connections.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_server_status.h" // replSettings #include "mongo/db/repl/rs.h" #include "mongo/db/server_parameters.h" #include "mongo/platform/bits.h" #include "mongo/s/d_logic.h" #include "mongo/util/net/sock.h" using namespace std; namespace mongo { using namespace bson; #ifdef MONGO_PLATFORM_64 const int ReplSetImpl::replWriterThreadCount = 16; const int ReplSetImpl::replPrefetcherThreadCount = 16; #else const int ReplSetImpl::replWriterThreadCount = 2; const int ReplSetImpl::replPrefetcherThreadCount = 2; #endif bool replSet = false; ReplSet *theReplSet = 0; // This is a bitmask with the first bit set. It's used to mark connections that should be kept // open during stepdowns const unsigned ScopedConn::keepOpen = 1; bool isCurrentlyAReplSetPrimary() { return theReplSet && theReplSet->isPrimary(); } void replset::sethbmsg(const string& s, const int level) { if (theReplSet) { theReplSet->sethbmsg(s, level); } } void ReplSetImpl::sethbmsg(const std::string& s, int logLevel) { static time_t lastLogged; _hbmsgTime = time(0); if( s == _hbmsg ) { // unchanged if( _hbmsgTime - lastLogged < 60 ) return; } unsigned sz = s.size(); if( sz >= 256 ) memcpy(_hbmsg, s.c_str(), 255); else { _hbmsg[sz] = 0; memcpy(_hbmsg, s.c_str(), sz); } if( !s.empty() ) { lastLogged = _hbmsgTime; LOG(logLevel) << "replSet " << s << rsLog; } } void ReplSetImpl::goStale(const Member* stale, const BSONObj& oldest) { log() << "replSet error RS102 too stale to catch up, at least from " << stale->fullName() << rsLog; log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; log() << "replSet oldest at " << stale->fullName() << " : " << oldest["ts"]._opTime().toStringLong() << rsLog; log() << "replSet See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember" << rsLog; // reset minvalid so that we can't become primary prematurely setMinValid(oldest); sethbmsg("error RS102 too stale to catch up"); changeState(MemberState::RS_RECOVERING); } void ReplSetImpl::assumePrimary() { LOG(2) << "replSet assuming primary" << endl; verify( iAmPotentiallyHot() ); // Wait for replication to stop and buffer to be consumed LOG(1) << "replSet waiting for replication to finish before becoming primary" << endl; replset::BackgroundSync::get()->stopReplicationAndFlushBuffer(); // Lock here to prevent stepping down & becoming primary from getting interleaved Lock::GlobalWrite lk; // Make sure that new OpTimes are higher than existing ones even with clock skew DBDirectClient c; BSONObj lastOp = c.findOne( "local.oplog.rs", Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk ); if ( !lastOp.isEmpty() ) { OpTime::setLast( lastOp[ "ts" ].date() ); } changeState(MemberState::RS_PRIMARY); } void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } bool ReplSetImpl::setMaintenanceMode(const bool inc) { lock replLock(this); // Lock here to prevent state from changing between checking the state and changing it Lock::GlobalWrite writeLock; if (box.getState().primary()) { return false; } if (inc) { log() << "replSet going into maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; _maintenanceMode++; changeState(MemberState::RS_RECOVERING); } else if (_maintenanceMode > 0) { _maintenanceMode--; // no need to change state, syncTail will try to go live as a secondary soon log() << "leaving maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; } else { return false; } fassert(16844, _maintenanceMode >= 0); return true; } Member* ReplSetImpl::getMostElectable() { lock lk(this); Member *max = 0; set::iterator it = _electableSet.begin(); while ( it != _electableSet.end() ) { const Member *temp = findById(*it); if (!temp) { log() << "couldn't find member: " << *it << endl; set::iterator it_delete = it; it++; _electableSet.erase(it_delete); continue; } if (!max || max->config().priority < temp->config().priority) { max = (Member*)temp; } it++; } return max; } void ReplSetImpl::relinquish() { { Lock::GlobalWrite lk; // so we are synchronized with _logOp() LOG(2) << "replSet attempting to relinquish" << endl; if( box.getState().primary() ) { log() << "replSet relinquishing primary state" << rsLog; changeState(MemberState::RS_SECONDARY); // close sockets that were talking to us so they don't blithly send many writes that // will fail with "not master" (of course client could check result code, but in // case they are not) log() << "replSet closing client sockets after relinquishing primary" << rsLog; MessagingPort::closeAllSockets(ScopedConn::keepOpen); } else if( box.getState().startup2() ) { // This block probably isn't necessary changeState(MemberState::RS_RECOVERING); return; } } // now that all connections were closed, strip this mongod from all sharding details // if and when it gets promoted to a primary again, only then it should reload the sharding state // the rationale here is that this mongod won't bring stale state when it regains primaryhood shardingState.resetShardingState(); } /* look freshly for who is primary - includes relinquishing ourself. */ void ReplSetImpl::forgetPrimary() { if( box.getState().primary() ) relinquish(); else { box.setOtherPrimary(0); } } // for the replSetStepDown command bool ReplSetImpl::_stepDown(int secs) { lock lk(this); if( box.getState().primary() ) { elect.steppedDown = time(0) + secs; log() << "replSet info stepping down as primary secs=" << secs << rsLog; relinquish(); return true; } return false; } bool ReplSetImpl::_freeze(int secs) { lock lk(this); /* note if we are primary we remain primary but won't try to elect ourself again until this time period expires. */ if( secs == 0 ) { elect.steppedDown = 0; log() << "replSet info 'unfreezing'" << rsLog; } else { if( !box.getState().primary() ) { elect.steppedDown = time(0) + secs; log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog; } else { log() << "replSet info received freeze command but we are primary" << rsLog; } } return true; } void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { for( Member *m = _members.head(); m; m=m->next() ) { if( m->id() == h.id() ) { m->_hbinfo.updateFromLastPoll(h); return; } } } void ReplSetImpl::msgUpdateHBRecv(unsigned id, time_t newTime) { for (Member *m = _members.head(); m; m = m->next()) { if (m->id() == id) { m->_hbinfo.lastHeartbeatRecv = newTime; return; } } } list ReplSetImpl::memberHostnames() const { list L; L.push_back(_self->h()); for( Member *m = _members.head(); m; m = m->next() ) L.push_back(m->h()); return L; } void ReplSetImpl::_fillIsMasterHost(const Member *m, vector& hosts, vector& passives, vector& arbiters) { verify( m ); if( m->config().hidden ) return; if( m->potentiallyHot() ) { hosts.push_back(m->h().toString()); } else if( !m->config().arbiterOnly ) { if( m->config().slaveDelay ) { /* hmmm - we don't list these as they are stale. */ } else { passives.push_back(m->h().toString()); } } else { arbiters.push_back(m->h().toString()); } } void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) { lock lk(this); const StateBox::SP sp = box.get(); bool isp = sp.state.primary(); b.append("setName", name()); b.append("setVersion", version()); b.append("ismaster", isp); b.append("secondary", sp.state.secondary()); { vector hosts, passives, arbiters; _fillIsMasterHost(_self, hosts, passives, arbiters); for( Member *m = _members.head(); m; m = m->next() ) { verify( m ); _fillIsMasterHost(m, hosts, passives, arbiters); } if( hosts.size() > 0 ) { b.append("hosts", hosts); } if( passives.size() > 0 ) { b.append("passives", passives); } if( arbiters.size() > 0 ) { b.append("arbiters", arbiters); } } if( !isp ) { const Member *m = sp.primary; if( m ) b.append("primary", m->h().toString()); } else { b.append("primary", _self->fullName()); } if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); if( myConfig().priority == 0 && !myConfig().arbiterOnly) b.append("passive", true); if( myConfig().slaveDelay ) b.append("slaveDelay", myConfig().slaveDelay); if( myConfig().hidden ) b.append("hidden", true); if( !myConfig().buildIndexes ) b.append("buildIndexes", false); if( !myConfig().tags.empty() ) { BSONObjBuilder a; for( map::const_iterator i = myConfig().tags.begin(); i != myConfig().tags.end(); i++ ) a.append((*i).first, (*i).second); b.append("tags", a.done()); } b.append("me", myConfig().h.toString()); } /** @param cfgString /, */ void parseReplsetCmdLine(const std::string& cfgString, string& setname, vector& seeds, set& seedSet ) { const char *p = cfgString.c_str(); const char *slash = strchr(p, '/'); if( slash ) setname = string(p, slash-p); else setname = p; uassert(13093, "bad --replSet config string format is: [/,,...]", !setname.empty()); if( slash == 0 ) return; p = slash + 1; while( 1 ) { const char *comma = strchr(p, ','); if( comma == 0 ) comma = strchr(p,0); if( p == comma ) break; { HostAndPort m; try { m = HostAndPort( string(p, comma-p) ); } catch(...) { uassert(13114, "bad --replSet seed hostname", false); } uassert(13096, "bad --replSet command line config string - dups?", seedSet.count(m) == 0 ); seedSet.insert(m); //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost()); if( m.isSelf() ) { LOG(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog; } else seeds.push_back(m); if( *comma == 0 ) break; p = comma + 1; } } } void ReplSetImpl::init(ReplSetCmdline& replSetCmdline) { mgr = new Manager(this); ghost = new GhostSync(this); _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); strcpy( _hbmsg , "initial startup" ); lastH = 0; changeState(MemberState::RS_STARTUP); _seeds = &replSetCmdline.seeds; LOG(1) << "replSet beginning startup..." << rsLog; loadConfig(); unsigned sss = replSetCmdline.seedSet.size(); for( Member *m = head(); m; m = m->next() ) { replSetCmdline.seedSet.erase(m->h()); } for( set::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { if( i->isSelf() ) { if( sss == 1 ) { LOG(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; } } else { log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; } } // Figure out indexPrefetch setting std::string& prefetch = cmdLine.rsIndexPrefetch; if (!prefetch.empty()) { IndexPrefetchConfig prefetchConfig = PREFETCH_ALL; if (prefetch == "none") prefetchConfig = PREFETCH_NONE; else if (prefetch == "_id_only") prefetchConfig = PREFETCH_ID_ONLY; else if (prefetch == "all") prefetchConfig = PREFETCH_ALL; else warning() << "unrecognized indexPrefetch setting: " << prefetch << endl; setIndexPrefetchConfig(prefetchConfig); } } ReplSetImpl::ReplSetImpl() : elect(this), _forceSyncTarget(0), _blockSync(false), _hbmsgTime(0), _self(0), _maintenanceMode(0), mgr(0), ghost(0), _writerPool(replWriterThreadCount), _prefetcherPool(replPrefetcherThreadCount), oplogVersion(0), _indexPrefetchConfig(PREFETCH_ALL) { } ReplSet::ReplSet() { } ReplSet* ReplSet::make(ReplSetCmdline& replSetCmdline) { auto_ptr ret(new ReplSet()); ret->init(replSetCmdline); return ret.release(); } void ReplSetImpl::loadLastOpTimeWritten(bool quiet) { Lock::DBRead lk(rsoplog); BSONObj o; if( Helpers::getLast(rsoplog, o) ) { lastH = o["h"].numberLong(); lastOpTimeWritten = o["ts"]._opTime(); uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull()); } } /* call after constructing to start - returns fairly quickly after launching its threads */ void ReplSetImpl::_go() { { boost::unique_lock lk(rss.mtx); while (!rss.indexRebuildDone) { rss.cond.wait(lk); } } try { loadLastOpTimeWritten(); } catch(std::exception& e) { log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; log() << e.what() << rsLog; sleepsecs(30); dbexit( EXIT_REPLICATION_ERROR ); return; } changeState(MemberState::RS_STARTUP2); startThreads(); newReplUp(); // oplog.cpp } ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; DiagStr ReplSetImpl::startupStatusMsg; ReplicationStartSynchronizer ReplSetImpl::rss; extern BSONObj *getLastErrorDefault; void ReplSetImpl::setSelfTo(Member *m) { // already locked in initFromConfig _self = m; _id = m->id(); _config = m->config(); if( m ) _buildIndexes = m->config().buildIndexes; else _buildIndexes = true; } /** @param reconf true if this is a reconfiguration and not an initial load of the configuration. @return true if ok; throws if config really bad; false if config doesn't include self */ bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { /* NOTE: haveNewConfig() writes the new config to disk before we get here. So we cannot error out at this point, except fatally. Check errors earlier. */ lock lk(this); if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) { // see comment in dbcommands.cpp for getlasterrordefault getLastErrorDefault = new BSONObj( c.getLastErrorDefaults ); } list newOnes; // additive short-cuts the new config setup. If we are just adding a // node/nodes and nothing else is changing, this is additive. If it's // not a reconfig, we're not adding anything bool additive = reconf; { unsigned nfound = 0; int me = 0; for( vector::iterator i = c.members.begin(); i != c.members.end(); i++ ) { ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { me++; } if( reconf ) { const Member *old = findById(m._id); if( old ) { nfound++; verify( (int) old->id() == m._id ); if (!old->config().isSameIgnoringTags(m)) { additive = false; } } else { newOnes.push_back(&m); } } } if( me == 0 ) { // we're not in the config -- we must have been removed if (state().shunned()) { // already took note of our ejection from the set // so just sit tight and poll again return false; } _members.orphanAll(); // kill off rsHealthPoll threads (because they Know Too Much about our past) endOldHealthTasks(); // close sockets to force clients to re-evaluate this member MessagingPort::closeAllSockets(0); // take note of our ejection changeState(MemberState::RS_SHUNNED); // go into holding pattern log() << "replSet info self not present in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; loadConfig(); // redo config from scratch return false; } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); // if we found different members that the original config, reload everything if( reconf && config().members.size() != nfound ) additive = false; } // If we are changing chaining rules, we don't want this to be an additive reconfig so that // the primary can step down and the sync targets change. // TODO: This can be removed once SERVER-5208 is fixed. if (reconf && config().chainingAllowed() != c.chainingAllowed()) { additive = false; } _cfg = new ReplSetConfig(c); dassert( &config() == _cfg ); // config() is same thing but const, so we use that when we can for clarity below verify( config().ok() ); verify( _name.empty() || _name == config()._id ); _name = config()._id; verify( !_name.empty() ); // this is a shortcut for simple changes if( additive ) { log() << "replSet info : additive change to configuration" << rsLog; for( list::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { ReplSetConfig::MemberCfg *m = *i; Member *mi = new Member(m->h, m->_id, m, false); /** we will indicate that new members are up() initially so that we don't relinquish our primary state because we can't (transiently) see a majority. they should be up as we check that new members are up before getting here on reconfig anyway. */ mi->get_hbinfo().health = 0.1; _members.push(mi); startHealthTaskFor(mi); } // if we aren't creating new members, we may have to update the // groups for the current ones _cfg->updateMembers(_members); return true; } // start with no members. if this is a reconfig, drop the old ones. _members.orphanAll(); endOldHealthTasks(); // Clear out our memory of who might have been syncing from us. // Any incoming handshake connections after this point will be newly registered. ghost->clearCache(); int oldPrimaryId = -1; { const Member *p = box.getPrimary(); if( p ) oldPrimaryId = p->id(); } forgetPrimary(); // not setting _self to 0 as other threads use _self w/o locking int me = 0; // For logging string members = ""; for( vector::const_iterator i = config().members.begin(); i != config().members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; Member *mi; members += ( members == "" ? "" : ", " ) + m.h.toString(); if( m.h.isSelf() ) { verify( me++ == 0 ); mi = new Member(m.h, m._id, &m, true); if (!reconf) { log() << "replSet I am " << m.h.toString() << rsLog; } setSelfTo(mi); if( (int)mi->id() == oldPrimaryId ) box.setSelfPrimary(mi); } else { mi = new Member(m.h, m._id, &m, false); _members.push(mi); if( (int)mi->id() == oldPrimaryId ) box.setOtherPrimary(mi); } } if( me == 0 ){ log() << "replSet warning did not detect own host in full reconfig, members " << members << " config: " << c << rsLog; } else { // Do this after we've found ourselves, since _self needs // to be set before we can start the heartbeat tasks for( Member *mb = _members.head(); mb; mb=mb->next() ) { startHealthTaskFor( mb ); } } return true; } // Our own config must be the first one. bool ReplSetImpl::_loadConfigFinish(vector& cfgs) { int v = -1; ReplSetConfig *highest = 0; int myVersion = -2000; int n = 0; for( vector::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { ReplSetConfig* cfg = *i; DEV { LOG(1) << n+1 << " config shows version " << cfg->version << rsLog; } if( ++n == 1 ) myVersion = cfg->version; if( cfg->ok() && cfg->version > v ) { highest = cfg; v = cfg->version; } } verify( highest ); if( !initFromConfig(*highest) ) return false; if( highest->version > myVersion && highest->version >= 0 ) { log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; highest->saveConfigLocally(BSONObj()); } return true; } void ReplSetImpl::loadConfig() { startupStatus = LOADINGCONFIG; startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)"); LOG(1) << "loadConfig() " << rsConfigNs << endl; while( 1 ) { try { OwnedPointerVector configs; try { configs.mutableVector().push_back(ReplSetConfig::makeDirect()); } catch(DBException& e) { log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog; } for( vector::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { try { configs.mutableVector().push_back( ReplSetConfig::make(*i) ); } catch( DBException& e ) { log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog; } } { scoped_lock lck( replSettings.discoveredSeeds_mx ); if( replSettings.discoveredSeeds.size() > 0 ) { for (set::iterator i = replSettings.discoveredSeeds.begin(); i != replSettings.discoveredSeeds.end(); i++) { try { configs.mutableVector().push_back( ReplSetConfig::make(HostAndPort(*i)) ); } catch( DBException& ) { LOG(1) << "replSet exception trying to load config from discovered seed " << *i << rsLog; replSettings.discoveredSeeds.erase(*i); } } } } if (!replSettings.reconfig.isEmpty()) { try { configs.mutableVector().push_back(ReplSetConfig::make(replSettings.reconfig, true)); } catch( DBException& re) { log() << "replSet couldn't load reconfig: " << re.what() << rsLog; replSettings.reconfig = BSONObj(); } } int nok = 0; int nempty = 0; for( vector::iterator i = configs.mutableVector().begin(); i != configs.mutableVector().end(); i++ ) { if( (*i)->ok() ) nok++; if( (*i)->empty() ) nempty++; } if( nok == 0 ) { if( nempty == (int) configs.mutableVector().size() ) { startupStatus = EMPTYCONFIG; startupStatusMsg.set("can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"); log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; static unsigned once; if( ++once == 1 ) { log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog; } if( _seeds->size() == 0 ) { LOG(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; } } else { startupStatus = EMPTYUNREACHABLE; startupStatusMsg.set("can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"); log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog; } sleepsecs(1); continue; } if( !_loadConfigFinish(configs.mutableVector()) ) { log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog; sleepsecs(20); continue; } } catch(DBException& e) { startupStatus = BADCONFIG; startupStatusMsg.set("replSet error loading set config (BADCONFIG)"); log() << "replSet error loading configurations " << e.toString() << rsLog; log() << "replSet error replication will not start" << rsLog; sethbmsg("error loading set config"); _fatal(); throw; } break; } startupStatusMsg.set("? started"); startupStatus = STARTED; } void ReplSetImpl::_fatal() { box.set(MemberState::RS_FATAL, 0); log() << "replSet error fatal, stopping replication" << rsLog; } void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { bo comment; if( addComment ) comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); newConfig.saveConfigLocally(comment); try { if (initFromConfig(newConfig, true)) { log() << "replSet replSetReconfig new config saved locally" << rsLog; } } catch(DBException& e) { log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog; _fatal(); } catch(...) { log() << "replSet error unexpected exception in haveNewConfig()" << rsLog; _fatal(); } } void Manager::msgReceivedNewConfig(BSONObj o) { log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog; scoped_ptr config(ReplSetConfig::make(o)); if( config->version > rs->config().version ) theReplSet->haveNewConfig(*config, false); else { log() << "replSet info msgReceivedNewConfig but version isn't higher " << config->version << ' ' << rs->config().version << rsLog; } } /* forked as a thread during startup it can run quite a while looking for config. but once found, a separate thread takes over as ReplSetImpl::Manager, and this thread terminates. */ void startReplSets(ReplSetCmdline *replSetCmdline) { Client::initThread("rsStart"); try { verify( theReplSet == 0 ); if( replSetCmdline == 0 ) { verify(!replSet); return; } replLocalAuth(); (theReplSet = ReplSet::make(*replSetCmdline))->go(); } catch(std::exception& e) { log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; if( theReplSet ) theReplSet->fatal(); } cc().shutdown(); } void ReplSet::shutdown() { replset::BackgroundSync::shutdown(); } void replLocalAuth() { if (!AuthorizationManager::isAuthEnabled()) return; cc().getAuthorizationSession()->grantInternalAuthorization(); } const char* ReplSetImpl::_initialSyncFlagString = "doingInitialSync"; const BSONObj ReplSetImpl::_initialSyncFlag(BSON(_initialSyncFlagString << true)); void ReplSetImpl::clearInitialSyncFlag() { Lock::DBWrite lk( "local" ); Helpers::putSingleton("local.replset.minvalid", BSON( "$unset" << _initialSyncFlag )); } void ReplSetImpl::setInitialSyncFlag() { Lock::DBWrite lk( "local" ); Helpers::putSingleton("local.replset.minvalid", BSON( "$set" << _initialSyncFlag )); } bool ReplSetImpl::getInitialSyncFlag() { Lock::DBRead lk ( "local" ); BSONObj mv; if (Helpers::getSingleton("local.replset.minvalid", mv)) { return mv[_initialSyncFlagString].trueValue(); } return false; } void ReplSetImpl::setMinValid(BSONObj obj) { BSONObjBuilder builder; BSONObjBuilder subobj(builder.subobjStart("$set")); subobj.appendTimestamp("ts", obj["ts"].date()); subobj.done(); Lock::DBWrite lk( "local" ); Helpers::putSingleton("local.replset.minvalid", builder.obj()); } OpTime ReplSetImpl::getMinValid() { Lock::DBRead lk("local.replset.minvalid"); BSONObj mv; if (Helpers::getSingleton("local.replset.minvalid", mv)) { return mv["ts"]._opTime(); } return OpTime(); } void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) { // To prevent race conditions with clearing the cache at reconfig time, // we lock the replset mutex here. { lock lk(this); ghost->associateSlave(rid, memberId); } syncSourceFeedback.associateMember(rid, memberId); } class ReplIndexPrefetch : public ServerParameter { public: ReplIndexPrefetch() : ServerParameter( ServerParameterSet::getGlobal(), "replIndexPrefetch" ) { } virtual ~ReplIndexPrefetch() { } const char * _value() { if (!theReplSet) return "uninitialized"; ReplSetImpl::IndexPrefetchConfig ip = theReplSet->getIndexPrefetchConfig(); switch (ip) { case ReplSetImpl::PREFETCH_NONE: return "none"; case ReplSetImpl::PREFETCH_ID_ONLY: return "_id_only"; case ReplSetImpl::PREFETCH_ALL: return "all"; default: return "invalid"; } } virtual void append( BSONObjBuilder& b, const string& name ) { b.append( name, _value() ); } virtual Status set( const BSONElement& newValueElement ) { if (!theReplSet) { return Status( ErrorCodes::BadValue, "replication is not enabled" ); } std::string prefetch = newValueElement.valuestrsafe(); return setFromString( prefetch ); } virtual Status setFromString( const string& prefetch ) { log() << "changing replication index prefetch behavior to " << prefetch << endl; ReplSetImpl::IndexPrefetchConfig prefetchConfig; if (prefetch == "none") prefetchConfig = ReplSetImpl::PREFETCH_NONE; else if (prefetch == "_id_only") prefetchConfig = ReplSetImpl::PREFETCH_ID_ONLY; else if (prefetch == "all") prefetchConfig = ReplSetImpl::PREFETCH_ALL; else { return Status( ErrorCodes::BadValue, str::stream() << "unrecognized indexPrefetch setting: " << prefetch ); } theReplSet->setIndexPrefetchConfig(prefetchConfig); return Status::OK(); } } replIndexPrefetch; }