diff options
Diffstat (limited to 'src/mongo/db/repl/rs_initialsync.cpp')
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp new file mode 100644 index 00000000000..b67c0d71b83 --- /dev/null +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -0,0 +1,271 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../repl.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../oplogreader.h" +#include "../../util/mongoutils/str.h" +#include "../dbhelpers.h" +#include "rs_optime.h" +#include "../oplog.h" + +namespace mongo { + + using namespace mongoutils; + using namespace bson; + + void dropAllDatabasesExceptLocal(); + + // add try/catch with sleep + + void isyncassert(const string& msg, bool expr) { + if( !expr ) { + string m = str::stream() << "initial sync " << msg; + theReplSet->sethbmsg(m, 0); + uasserted(13404, m); + } + } + + void ReplSetImpl::syncDoInitialSync() { + createOplog(); + + while( 1 ) { + try { + _syncDoInitialSync(); + break; + } + catch(DBException& e) { + sethbmsg("initial sync exception " + e.toString(), 0); + sleepsecs(30); + } + } + } + + /* todo : progress metering to sethbmsg. */ + static bool clone(const char *master, string db) { + string err; + return cloneFrom(master, err, db, false, + /* slave_ok */ true, true, false, /*mayYield*/true, /*mayBeInterrupted*/false); + } + + void _logOpObjRS(const BSONObj& op); + + static void emptyOplog() { + writelock lk(rsoplog); + Client::Context ctx(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); + + // temp + if( d && d->stats.nrecords == 0 ) + return; // already empty, ok. + + LOG(1) << "replSet empty oplog" << rsLog; + d->emptyCappedCollection(rsoplog); + } + + Member* ReplSetImpl::getMemberToSyncTo() { + Member *closest = 0; + time_t now = 0; + bool buildIndexes = true; + + // wait for 2N pings before choosing a sync target + if (_cfg) { + int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; + + if (needMorePings > 0) { + OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; + return NULL; + } + + buildIndexes = myConfig().buildIndexes; + } + + // find the member with the lowest ping time that has more data than me + for (Member *m = _members.head(); m; m = m->next()) { + if (m->hbinfo().up() && + // make sure members with buildIndexes sync from other members w/indexes + (!buildIndexes || (buildIndexes && m->config().buildIndexes)) && + (m->state() == MemberState::RS_PRIMARY || + (m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) && + (!closest || m->hbinfo().ping < closest->hbinfo().ping)) { + + map<string,time_t>::iterator vetoed = _veto.find(m->fullName()); + if (vetoed == _veto.end()) { + closest = m; + break; + } + + if (now == 0) { + now = time(0); + } + + // if this was on the veto list, check if it was vetoed in the last "while" + if ((*vetoed).second < now) { + _veto.erase(vetoed); + closest = m; + break; + } + + // if it was recently vetoed, skip + log() << "replSet not trying to sync from " << (*vetoed).first + << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog; + } + } + + { + lock lk(this); + + if (!closest) { + _currentSyncTarget = NULL; + return NULL; + } + + _currentSyncTarget = closest; + } + + sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); + + return closest; + } + + void ReplSetImpl::veto(const string& host, const unsigned secs) { + _veto[host] = time(0)+secs; + } + + /** + * Do the initial sync for this member. + */ + void ReplSetImpl::_syncDoInitialSync() { + sethbmsg("initial sync pending",0); + + // if this is the first node, it may have already become primary + if ( box.getState().primary() ) { + sethbmsg("I'm already primary, no need for initial sync",0); + return; + } + + const Member *source = getMemberToSyncTo(); + if (!source) { + sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); + sleepsecs(15); + return; + } + + string sourceHostname = source->h().toString(); + OplogReader r; + if( !r.connect(sourceHostname) ) { + sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); + sleepsecs(15); + return; + } + + BSONObj lastOp = r.getLastOp(rsoplog); + if( lastOp.isEmpty() ) { + sethbmsg("initial sync couldn't read remote oplog", 0); + sleepsecs(15); + return; + } + OpTime startingTS = lastOp["ts"]._opTime(); + + if (replSettings.fastsync) { + log() << "fastsync: skipping database clone" << rsLog; + } + else { + sethbmsg("initial sync drop all databases", 0); + dropAllDatabasesExceptLocal(); + + sethbmsg("initial sync clone all databases", 0); + + list<string> dbs = r.conn()->getDatabaseNames(); + for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { + string db = *i; + if( db != "local" ) { + sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + bool ok; + { + writelock lk(db); + Client::Context ctx(db); + ok = clone(sourceHostname.c_str(), db); + } + if( !ok ) { + sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); + veto(source->fullName(), 600); + sleepsecs(300); + return; + } + } + } + } + + sethbmsg("initial sync query minValid",0); + + /* our cloned copy will be strange until we apply oplog events that occurred + through the process. we note that time point here. */ + BSONObj minValid = r.getLastOp(rsoplog); + isyncassert( "getLastOp is empty ", !minValid.isEmpty() ); + OpTime mvoptime = minValid["ts"]._opTime(); + assert( !mvoptime.isNull() ); + assert( mvoptime >= startingTS ); + + // apply startingTS..mvoptime portion of the oplog + { + // note we assume here that this call does not throw + if( ! initialSyncOplogApplication(startingTS, mvoptime) ) { + log() << "replSet initial sync failed during oplog application phase" << rsLog; + + emptyOplog(); // otherwise we'll be up! + + lastOpTimeWritten = OpTime(); + lastH = 0; + + log() << "replSet cleaning up [1]" << rsLog; + { + writelock lk("local."); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } + log() << "replSet cleaning up [2]" << rsLog; + + log() << "replSet initial sync failed will try again" << endl; + + sleepsecs(5); + return; + } + } + + sethbmsg("initial sync finishing up",0); + + assert( !box.getState().primary() ); // wouldn't make sense if we were. + + { + writelock lk("local."); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + try { + log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog; + } + catch(...) { } + Helpers::putSingleton("local.replset.minvalid", minValid); + cx.db()->flushFiles(true); + } + + sethbmsg("initial sync done",0); + } + +} |