diff options
Diffstat (limited to 'src/mongo/db/repl/rs_initialsync.cpp')
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 378 |
1 files changed, 108 insertions, 270 deletions
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index a4d8f77045a..5709f6eb98c 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/rs.h" +#include "mongo/db/repl/rs_initialsync.h" #include "mongo/bson/optime.h" #include "mongo/db/auth/authorization_manager.h" @@ -42,7 +42,6 @@ #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/initial_sync.h" -#include "mongo/db/repl/member.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" @@ -52,50 +51,14 @@ #include "mongo/util/mongoutils/str.h" namespace mongo { - namespace repl { +namespace { - using namespace mongoutils; - - // add try/catch with sleep - - void isyncassert(const string& msg, bool expr) { - if( !expr ) { - string m = str::stream() << "initial sync " << msg; - theReplSet->sethbmsg(m, 0); - uasserted(13404, m); - } - } - - void ReplSetImpl::syncDoInitialSync() { - static const int maxFailedAttempts = 10; - - OperationContextImpl txn; - createOplog(&txn); - - int failedAttempts = 0; - while ( failedAttempts < maxFailedAttempts ) { - try { - _initialSync(); - break; - } - catch(DBException& e) { - failedAttempts++; - str::stream msg; - msg << "initial sync exception: "; - msg << e.toString() << " " << (maxFailedAttempts - failedAttempts) << " attempts remaining" ; - sethbmsg(msg, 0); - sleepsecs(30); - } - } - fassert( 16233, failedAttempts < maxFailedAttempts); - } - - bool ReplSetImpl::_initialSyncClone(OperationContext* txn, - Cloner& cloner, - const std::string& host, - const list<string>& dbs, - bool dataPass) { + bool _initialSyncClone(OperationContext* txn, + Cloner& cloner, + const std::string& host, + const list<string>& dbs, + bool dataPass) { for( list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++ ) { const string db = *i; @@ -103,9 +66,9 @@ namespace repl { continue; if ( dataPass ) - sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + log() << "initial sync cloning db: " << db; else - sethbmsg( str::stream() << "initial sync cloning indexes for : " << db , 0); + log() << "initial sync cloning indexes for : " << db; string err; int errCode; @@ -124,10 +87,9 @@ namespace repl { Lock::DBLock dbWrite(txn->lockState(), db, newlm::MODE_X); if (!cloner.go(txn, db, host, options, NULL, err, &errCode)) { - sethbmsg(str::stream() << "initial sync: error while " - << (dataPass ? "cloning " : "indexing ") << db - << ". " << (err.empty() ? "" : err + ". ") - << "sleeping 5 minutes" ,0); + log() << "initial sync: error while " + << (dataPass ? "cloning " : "indexing ") << db + << ". " << (err.empty() ? "" : err + ". "); return false; } } @@ -135,140 +97,6 @@ namespace repl { return true; } - static void emptyOplog(OperationContext* txn) { - Client::WriteContext ctx(txn, rsoplog); - - Collection* collection = ctx.ctx().db()->getCollection(txn, rsoplog); - - // temp - if( collection->numRecords(txn) == 0 ) - return; // already empty, ok. - - LOG(1) << "replSet empty oplog" << rsLog; - uassertStatusOK( collection->truncate(txn) ); - ctx.commit(); - } - - const Member* ReplSetImpl::getMemberToSyncTo() { - lock lk(this); - - // if we have a target we've requested to sync from, use it - - if (_forceSyncTarget) { - Member* target = _forceSyncTarget; - _forceSyncTarget = 0; - sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0); - return target; - } - - const Member* primary = box.getPrimary(); - - // wait for 2N pings before choosing a sync target - if (_cfg) { - int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; - - if (needMorePings > 0) { - OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; - return NULL; - } - - // If we are only allowed to sync from the primary, return that - if (!_cfg->chainingAllowed()) { - // Returns NULL if we cannot reach the primary - return primary; - } - } - - // find the member with the lowest ping time that has more data than me - - // Find primary's oplog time. Reject sync candidates that are more than - // maxSyncSourceLagSecs seconds behind. - OpTime primaryOpTime; - if (primary) - primaryOpTime = primary->hbinfo().opTime; - else - // choose a time that will exclude no candidates, since we don't see a primary - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); - - if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) { - // erh - I think this means there was just a new election - // and we don't yet know the new primary's optime - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); - } - - OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0); - - Member *closest = 0; - time_t now = 0; - - // Make two attempts. The first attempt, we ignore those nodes with - // slave delay higher than our own. The second attempt includes such - // nodes, in case those are the only ones we can reach. - // This loop attempts to set 'closest'. - for (int attempts = 0; attempts < 2; ++attempts) { - for (Member *m = _members.head(); m; m = m->next()) { - if (!m->syncable()) - continue; - - if (m->state() == MemberState::RS_SECONDARY) { - // only consider secondaries that are ahead of where we are - if (m->hbinfo().opTime <= lastOpTimeWritten) - continue; - // omit secondaries that are excessively behind, on the first attempt at least. - if (attempts == 0 && - m->hbinfo().opTime < oldestSyncOpTime) - continue; - } - - // omit nodes that are more latent than anything we've already considered - if (closest && - (m->hbinfo().ping > closest->hbinfo().ping)) - continue; - - if (attempts == 0 && - (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) { - continue; // skip this one in the first attempt - } - - map<string,time_t>::iterator vetoed = _veto.find(m->fullName()); - if (vetoed != _veto.end()) { - // Do some veto housekeeping - if (now == 0) { - now = time(0); - } - - // if this was on the veto list, check if it was vetoed in the last "while". - // if it was, skip. - if (vetoed->second >= now) { - if (time(0) % 5 == 0) { - log() << "replSet not trying to sync from " << (*vetoed).first - << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog; - } - continue; - } - _veto.erase(vetoed); - // fall through, this is a valid candidate now - } - // This candidate has passed all tests; set 'closest' - closest = m; - } - if (closest) break; // no need for second attempt - } - - if (!closest) { - return NULL; - } - - sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); - - return closest; - } - - void ReplSetImpl::veto(const string& host, const unsigned secs) { - lock lk(this); - _veto[host] = time(0)+secs; - } - /** * Replays the sync target's oplog from lastOp to the latest op on the sync target. * @@ -277,11 +105,10 @@ namespace repl { * @param source the sync target * @return if applying the oplog succeeded */ - bool ReplSetImpl::_initialSyncApplyOplog( OperationContext* ctx, - repl::SyncTail& syncer, - OplogReader* r, - const Member* source) { - const OpTime startOpTime = lastOpTimeWritten; + bool _initialSyncApplyOplog( OperationContext* ctx, + repl::SyncTail& syncer, + OplogReader* r) { + const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime(); BSONObj lastOp; try { // It may have been a long time since we last used this connection to @@ -291,16 +118,22 @@ namespace repl { // Solution is to increase the TCP keepalive frequency. lastOp = r->getLastOp(rsoplog); } catch ( SocketException & ) { - log() << "connection lost to " << source->h().toString() << "; is your tcp keepalive interval set appropriately?"; - if( !r->connect(source->h()) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); + HostAndPort host = r->getHost(); + log() << "connection lost to " << host.toString() << + "; is your tcp keepalive interval set appropriately?"; + if( !r->connect(host) ) { + error() << "initial sync couldn't connect to " << host.toString(); throw; } // retry lastOp = r->getLastOp(rsoplog); } - isyncassert( "lastOp is empty ", !lastOp.isEmpty() ); + if (lastOp.isEmpty()) { + error() << "initial sync lastOp is empty"; + sleepsecs(1); + return false; + } OpTime stopOpTime = lastOp["ts"]._opTime(); @@ -320,7 +153,7 @@ namespace repl { << rsLog; getGlobalReplicationCoordinator()->setMyLastOptime(ctx, OpTime()); - lastH = 0; + BackgroundSync::get()->setLastHash(0); sleepsecs(5); return false; @@ -350,42 +183,36 @@ namespace repl { * this member should have consistent data. 8 is "cosmetic," it is only to get this member * closer to the latest op time before it can transition out of startup state */ - void ReplSetImpl::_initialSync() { - InitialSync init(BackgroundSync::get()); - SyncTail tail(BackgroundSync::get(), multiSyncApply); - sethbmsg("initial sync pending",0); - - // if this is the first node, it may have already become primary - if ( box.getState().primary() ) { - sethbmsg("I'm already primary, no need for initial sync",0); - return; - } - - const Member *source = getMemberToSyncTo(); - if (!source) { - sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); - sleepsecs(15); - return; - } + void _initialSync() { + BackgroundSync* bgsync(BackgroundSync::get()); + InitialSync init(bgsync); + SyncTail tail(bgsync, multiSyncApply); + log() << "initial sync pending"; - string sourceHostname = source->h().toString(); - init.setHostname(sourceHostname); OplogReader r; - if( !r.connect(source->h()) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); - sleepsecs(15); + OpTime now(Milliseconds(curTimeMillis64()).total_seconds(), 0); + OperationContextImpl txn; + + ReplicationCoordinator* replCoord(getGlobalReplicationCoordinator()); + + // We must prime the sync source selector so that it considers all candidates regardless + // of oplog position, by passing in "now" as the last op fetched time. + r.connectToSyncSource(&txn, now, replCoord); + if (r.getHost().empty()) { + log() << "no valid sync sources found in current replset to do an initial sync"; + sleepsecs(3); return; } + init.setHostname(r.getHost().toString()); + BSONObj lastOp = r.getLastOp(rsoplog); - if( lastOp.isEmpty() ) { - sethbmsg("initial sync couldn't read remote oplog", 0); + if ( lastOp.isEmpty() ) { + log() << "initial sync couldn't read remote oplog"; sleepsecs(15); return; } - OperationContextImpl txn; - if (getGlobalReplicationCoordinator()->getSettings().fastsync) { log() << "fastsync: skipping database clone" << rsLog; @@ -394,54 +221,49 @@ namespace repl { _logOpObjRS(&txn, lastOp); return; } - else { - // Add field to minvalid document to tell us to restart initial sync if we crash - setInitialSyncFlag(&txn); - sethbmsg("initial sync drop all databases", 0); - dropAllDatabasesExceptLocal(&txn); + // Add field to minvalid document to tell us to restart initial sync if we crash + setInitialSyncFlag(&txn); - sethbmsg("initial sync clone all databases", 0); + log() << "initial sync drop all databases"; + dropAllDatabasesExceptLocal(&txn); - list<string> dbs = r.conn()->getDatabaseNames(); + log() << "initial sync clone all databases"; - Cloner cloner; - if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) { - veto(source->fullName(), 600); - sleepsecs(300); - return; - } + list<string> dbs = r.conn()->getDatabaseNames(); - sethbmsg("initial sync data copy, starting syncup",0); + Cloner cloner; + if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) { + return; + } - // prime oplog - init.syncApply(&txn, lastOp, false); - _logOpObjRS(&txn, lastOp); + log() << "initial sync data copy, starting syncup"; - log() << "oplog sync 1 of 3" << endl; - if (!_initialSyncApplyOplog(&txn, init, &r , source)) { - return; - } + // prime oplog + init.syncApply(&txn, lastOp, false); + _logOpObjRS(&txn, lastOp); - // Now we sync to the latest op on the sync target _again_, as we may have recloned ops - // that were "from the future" compared with minValid. During this second application, - // nothing should need to be recloned. - log() << "oplog sync 2 of 3" << endl; - if (!_initialSyncApplyOplog(&txn, init, &r , source)) { - return; - } - // data should now be consistent + log() << "oplog sync 1 of 3" << endl; + if (!_initialSyncApplyOplog(&txn, init, &r)) { + return; + } - sethbmsg("initial sync building indexes",0); - if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) { - veto(source->fullName(), 600); - sleepsecs(300); - return; - } + // Now we sync to the latest op on the sync target _again_, as we may have recloned ops + // that were "from the future" compared with minValid. During this second application, + // nothing should need to be recloned. + log() << "oplog sync 2 of 3" << endl; + if (!_initialSyncApplyOplog(&txn, init, &r)) { + return; + } + // data should now be consistent + + log() << "initial sync building indexes"; + if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) { + return; } log() << "oplog sync 3 of 3" << endl; - if (!_initialSyncApplyOplog(&txn, tail, &r, source)) { + if (!_initialSyncApplyOplog(&txn, tail, &r)) { return; } @@ -453,17 +275,12 @@ namespace repl { return; } - sethbmsg("initial sync finishing up",0); - - verify( !box.getState().primary() ); // wouldn't make sense if we were. + log() << "initial sync finishing up"; { Client::WriteContext cx(&txn, "local."); - - try { - log() << "replSet set minValid=" << lastOpTimeWritten << rsLog; - } - catch(...) { } + OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime()); + log() << "replSet set minValid=" << lastOpTimeWritten << rsLog; // Initial sync is now complete. Flag this by setting minValid to the last thing // we synced. @@ -471,18 +288,39 @@ namespace repl { // Clear the initial sync flag. clearInitialSyncFlag(&txn); + BackgroundSync::get()->setInitialSyncRequestedFlag(false); cx.commit(); } - { - boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex); - theReplSet->initialSyncRequested = false; - } // If we just cloned & there were no ops applied, we still want the primary to know where // we're up to - BackgroundSync::notify(); + bgsync->notify(); + + log() << "initial sync done"; + } +} // namespace + + void syncDoInitialSync() { + static const int maxFailedAttempts = 10; + + OperationContextImpl txn; + createOplog(&txn); - sethbmsg("initial sync done",0); + int failedAttempts = 0; + while ( failedAttempts < maxFailedAttempts ) { + try { + _initialSync(); + break; + } + catch(DBException& e) { + failedAttempts++; + mongoutils::str::stream msg; + error() << "initial sync exception: " << e.toString() << " " << + (maxFailedAttempts - failedAttempts) << " attempts remaining"; + sleepsecs(5); + } + } + fassert( 16233, failedAttempts < maxFailedAttempts); } } // namespace repl |