summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/rs_initialsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/rs_initialsync.cpp')
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp378
1 files changed, 108 insertions, 270 deletions
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index a4d8f77045a..5709f6eb98c 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/rs.h"
+#include "mongo/db/repl/rs_initialsync.h"
#include "mongo/bson/optime.h"
#include "mongo/db/auth/authorization_manager.h"
@@ -42,7 +42,6 @@
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/initial_sync.h"
-#include "mongo/db/repl/member.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
@@ -52,50 +51,14 @@
#include "mongo/util/mongoutils/str.h"
namespace mongo {
-
namespace repl {
+namespace {
- using namespace mongoutils;
-
- // add try/catch with sleep
-
- void isyncassert(const string& msg, bool expr) {
- if( !expr ) {
- string m = str::stream() << "initial sync " << msg;
- theReplSet->sethbmsg(m, 0);
- uasserted(13404, m);
- }
- }
-
- void ReplSetImpl::syncDoInitialSync() {
- static const int maxFailedAttempts = 10;
-
- OperationContextImpl txn;
- createOplog(&txn);
-
- int failedAttempts = 0;
- while ( failedAttempts < maxFailedAttempts ) {
- try {
- _initialSync();
- break;
- }
- catch(DBException& e) {
- failedAttempts++;
- str::stream msg;
- msg << "initial sync exception: ";
- msg << e.toString() << " " << (maxFailedAttempts - failedAttempts) << " attempts remaining" ;
- sethbmsg(msg, 0);
- sleepsecs(30);
- }
- }
- fassert( 16233, failedAttempts < maxFailedAttempts);
- }
-
- bool ReplSetImpl::_initialSyncClone(OperationContext* txn,
- Cloner& cloner,
- const std::string& host,
- const list<string>& dbs,
- bool dataPass) {
+ bool _initialSyncClone(OperationContext* txn,
+ Cloner& cloner,
+ const std::string& host,
+ const list<string>& dbs,
+ bool dataPass) {
for( list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++ ) {
const string db = *i;
@@ -103,9 +66,9 @@ namespace repl {
continue;
if ( dataPass )
- sethbmsg( str::stream() << "initial sync cloning db: " << db , 0);
+ log() << "initial sync cloning db: " << db;
else
- sethbmsg( str::stream() << "initial sync cloning indexes for : " << db , 0);
+ log() << "initial sync cloning indexes for : " << db;
string err;
int errCode;
@@ -124,10 +87,9 @@ namespace repl {
Lock::DBLock dbWrite(txn->lockState(), db, newlm::MODE_X);
if (!cloner.go(txn, db, host, options, NULL, err, &errCode)) {
- sethbmsg(str::stream() << "initial sync: error while "
- << (dataPass ? "cloning " : "indexing ") << db
- << ". " << (err.empty() ? "" : err + ". ")
- << "sleeping 5 minutes" ,0);
+ log() << "initial sync: error while "
+ << (dataPass ? "cloning " : "indexing ") << db
+ << ". " << (err.empty() ? "" : err + ". ");
return false;
}
}
@@ -135,140 +97,6 @@ namespace repl {
return true;
}
- static void emptyOplog(OperationContext* txn) {
- Client::WriteContext ctx(txn, rsoplog);
-
- Collection* collection = ctx.ctx().db()->getCollection(txn, rsoplog);
-
- // temp
- if( collection->numRecords(txn) == 0 )
- return; // already empty, ok.
-
- LOG(1) << "replSet empty oplog" << rsLog;
- uassertStatusOK( collection->truncate(txn) );
- ctx.commit();
- }
-
- const Member* ReplSetImpl::getMemberToSyncTo() {
- lock lk(this);
-
- // if we have a target we've requested to sync from, use it
-
- if (_forceSyncTarget) {
- Member* target = _forceSyncTarget;
- _forceSyncTarget = 0;
- sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0);
- return target;
- }
-
- const Member* primary = box.getPrimary();
-
- // wait for 2N pings before choosing a sync target
- if (_cfg) {
- int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings;
-
- if (needMorePings > 0) {
- OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
- return NULL;
- }
-
- // If we are only allowed to sync from the primary, return that
- if (!_cfg->chainingAllowed()) {
- // Returns NULL if we cannot reach the primary
- return primary;
- }
- }
-
- // find the member with the lowest ping time that has more data than me
-
- // Find primary's oplog time. Reject sync candidates that are more than
- // maxSyncSourceLagSecs seconds behind.
- OpTime primaryOpTime;
- if (primary)
- primaryOpTime = primary->hbinfo().opTime;
- else
- // choose a time that will exclude no candidates, since we don't see a primary
- primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
-
- if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) {
- // erh - I think this means there was just a new election
- // and we don't yet know the new primary's optime
- primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
- }
-
- OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0);
-
- Member *closest = 0;
- time_t now = 0;
-
- // Make two attempts. The first attempt, we ignore those nodes with
- // slave delay higher than our own. The second attempt includes such
- // nodes, in case those are the only ones we can reach.
- // This loop attempts to set 'closest'.
- for (int attempts = 0; attempts < 2; ++attempts) {
- for (Member *m = _members.head(); m; m = m->next()) {
- if (!m->syncable())
- continue;
-
- if (m->state() == MemberState::RS_SECONDARY) {
- // only consider secondaries that are ahead of where we are
- if (m->hbinfo().opTime <= lastOpTimeWritten)
- continue;
- // omit secondaries that are excessively behind, on the first attempt at least.
- if (attempts == 0 &&
- m->hbinfo().opTime < oldestSyncOpTime)
- continue;
- }
-
- // omit nodes that are more latent than anything we've already considered
- if (closest &&
- (m->hbinfo().ping > closest->hbinfo().ping))
- continue;
-
- if (attempts == 0 &&
- (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) {
- continue; // skip this one in the first attempt
- }
-
- map<string,time_t>::iterator vetoed = _veto.find(m->fullName());
- if (vetoed != _veto.end()) {
- // Do some veto housekeeping
- if (now == 0) {
- now = time(0);
- }
-
- // if this was on the veto list, check if it was vetoed in the last "while".
- // if it was, skip.
- if (vetoed->second >= now) {
- if (time(0) % 5 == 0) {
- log() << "replSet not trying to sync from " << (*vetoed).first
- << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog;
- }
- continue;
- }
- _veto.erase(vetoed);
- // fall through, this is a valid candidate now
- }
- // This candidate has passed all tests; set 'closest'
- closest = m;
- }
- if (closest) break; // no need for second attempt
- }
-
- if (!closest) {
- return NULL;
- }
-
- sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0);
-
- return closest;
- }
-
- void ReplSetImpl::veto(const string& host, const unsigned secs) {
- lock lk(this);
- _veto[host] = time(0)+secs;
- }
-
/**
* Replays the sync target's oplog from lastOp to the latest op on the sync target.
*
@@ -277,11 +105,10 @@ namespace repl {
* @param source the sync target
* @return if applying the oplog succeeded
*/
- bool ReplSetImpl::_initialSyncApplyOplog( OperationContext* ctx,
- repl::SyncTail& syncer,
- OplogReader* r,
- const Member* source) {
- const OpTime startOpTime = lastOpTimeWritten;
+ bool _initialSyncApplyOplog( OperationContext* ctx,
+ repl::SyncTail& syncer,
+ OplogReader* r) {
+ const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime();
BSONObj lastOp;
try {
// It may have been a long time since we last used this connection to
@@ -291,16 +118,22 @@ namespace repl {
// Solution is to increase the TCP keepalive frequency.
lastOp = r->getLastOp(rsoplog);
} catch ( SocketException & ) {
- log() << "connection lost to " << source->h().toString() << "; is your tcp keepalive interval set appropriately?";
- if( !r->connect(source->h()) ) {
- sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0);
+ HostAndPort host = r->getHost();
+ log() << "connection lost to " << host.toString() <<
+ "; is your tcp keepalive interval set appropriately?";
+ if( !r->connect(host) ) {
+ error() << "initial sync couldn't connect to " << host.toString();
throw;
}
// retry
lastOp = r->getLastOp(rsoplog);
}
- isyncassert( "lastOp is empty ", !lastOp.isEmpty() );
+ if (lastOp.isEmpty()) {
+ error() << "initial sync lastOp is empty";
+ sleepsecs(1);
+ return false;
+ }
OpTime stopOpTime = lastOp["ts"]._opTime();
@@ -320,7 +153,7 @@ namespace repl {
<< rsLog;
getGlobalReplicationCoordinator()->setMyLastOptime(ctx, OpTime());
- lastH = 0;
+ BackgroundSync::get()->setLastHash(0);
sleepsecs(5);
return false;
@@ -350,42 +183,36 @@ namespace repl {
* this member should have consistent data. 8 is "cosmetic," it is only to get this member
* closer to the latest op time before it can transition out of startup state
*/
- void ReplSetImpl::_initialSync() {
- InitialSync init(BackgroundSync::get());
- SyncTail tail(BackgroundSync::get(), multiSyncApply);
- sethbmsg("initial sync pending",0);
-
- // if this is the first node, it may have already become primary
- if ( box.getState().primary() ) {
- sethbmsg("I'm already primary, no need for initial sync",0);
- return;
- }
-
- const Member *source = getMemberToSyncTo();
- if (!source) {
- sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0);
- sleepsecs(15);
- return;
- }
+ void _initialSync() {
+ BackgroundSync* bgsync(BackgroundSync::get());
+ InitialSync init(bgsync);
+ SyncTail tail(bgsync, multiSyncApply);
+ log() << "initial sync pending";
- string sourceHostname = source->h().toString();
- init.setHostname(sourceHostname);
OplogReader r;
- if( !r.connect(source->h()) ) {
- sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0);
- sleepsecs(15);
+ OpTime now(Milliseconds(curTimeMillis64()).total_seconds(), 0);
+ OperationContextImpl txn;
+
+ ReplicationCoordinator* replCoord(getGlobalReplicationCoordinator());
+
+ // We must prime the sync source selector so that it considers all candidates regardless
+ // of oplog position, by passing in "now" as the last op fetched time.
+ r.connectToSyncSource(&txn, now, replCoord);
+ if (r.getHost().empty()) {
+ log() << "no valid sync sources found in current replset to do an initial sync";
+ sleepsecs(3);
return;
}
+ init.setHostname(r.getHost().toString());
+
BSONObj lastOp = r.getLastOp(rsoplog);
- if( lastOp.isEmpty() ) {
- sethbmsg("initial sync couldn't read remote oplog", 0);
+ if ( lastOp.isEmpty() ) {
+ log() << "initial sync couldn't read remote oplog";
sleepsecs(15);
return;
}
- OperationContextImpl txn;
-
if (getGlobalReplicationCoordinator()->getSettings().fastsync) {
log() << "fastsync: skipping database clone" << rsLog;
@@ -394,54 +221,49 @@ namespace repl {
_logOpObjRS(&txn, lastOp);
return;
}
- else {
- // Add field to minvalid document to tell us to restart initial sync if we crash
- setInitialSyncFlag(&txn);
- sethbmsg("initial sync drop all databases", 0);
- dropAllDatabasesExceptLocal(&txn);
+ // Add field to minvalid document to tell us to restart initial sync if we crash
+ setInitialSyncFlag(&txn);
- sethbmsg("initial sync clone all databases", 0);
+ log() << "initial sync drop all databases";
+ dropAllDatabasesExceptLocal(&txn);
- list<string> dbs = r.conn()->getDatabaseNames();
+ log() << "initial sync clone all databases";
- Cloner cloner;
- if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) {
- veto(source->fullName(), 600);
- sleepsecs(300);
- return;
- }
+ list<string> dbs = r.conn()->getDatabaseNames();
- sethbmsg("initial sync data copy, starting syncup",0);
+ Cloner cloner;
+ if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) {
+ return;
+ }
- // prime oplog
- init.syncApply(&txn, lastOp, false);
- _logOpObjRS(&txn, lastOp);
+ log() << "initial sync data copy, starting syncup";
- log() << "oplog sync 1 of 3" << endl;
- if (!_initialSyncApplyOplog(&txn, init, &r , source)) {
- return;
- }
+ // prime oplog
+ init.syncApply(&txn, lastOp, false);
+ _logOpObjRS(&txn, lastOp);
- // Now we sync to the latest op on the sync target _again_, as we may have recloned ops
- // that were "from the future" compared with minValid. During this second application,
- // nothing should need to be recloned.
- log() << "oplog sync 2 of 3" << endl;
- if (!_initialSyncApplyOplog(&txn, init, &r , source)) {
- return;
- }
- // data should now be consistent
+ log() << "oplog sync 1 of 3" << endl;
+ if (!_initialSyncApplyOplog(&txn, init, &r)) {
+ return;
+ }
- sethbmsg("initial sync building indexes",0);
- if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) {
- veto(source->fullName(), 600);
- sleepsecs(300);
- return;
- }
+ // Now we sync to the latest op on the sync target _again_, as we may have recloned ops
+ // that were "from the future" compared with minValid. During this second application,
+ // nothing should need to be recloned.
+ log() << "oplog sync 2 of 3" << endl;
+ if (!_initialSyncApplyOplog(&txn, init, &r)) {
+ return;
+ }
+ // data should now be consistent
+
+ log() << "initial sync building indexes";
+ if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) {
+ return;
}
log() << "oplog sync 3 of 3" << endl;
- if (!_initialSyncApplyOplog(&txn, tail, &r, source)) {
+ if (!_initialSyncApplyOplog(&txn, tail, &r)) {
return;
}
@@ -453,17 +275,12 @@ namespace repl {
return;
}
- sethbmsg("initial sync finishing up",0);
-
- verify( !box.getState().primary() ); // wouldn't make sense if we were.
+ log() << "initial sync finishing up";
{
Client::WriteContext cx(&txn, "local.");
-
- try {
- log() << "replSet set minValid=" << lastOpTimeWritten << rsLog;
- }
- catch(...) { }
+ OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime());
+ log() << "replSet set minValid=" << lastOpTimeWritten << rsLog;
// Initial sync is now complete. Flag this by setting minValid to the last thing
// we synced.
@@ -471,18 +288,39 @@ namespace repl {
// Clear the initial sync flag.
clearInitialSyncFlag(&txn);
+ BackgroundSync::get()->setInitialSyncRequestedFlag(false);
cx.commit();
}
- {
- boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex);
- theReplSet->initialSyncRequested = false;
- }
// If we just cloned & there were no ops applied, we still want the primary to know where
// we're up to
- BackgroundSync::notify();
+ bgsync->notify();
+
+ log() << "initial sync done";
+ }
+} // namespace
+
+ void syncDoInitialSync() {
+ static const int maxFailedAttempts = 10;
+
+ OperationContextImpl txn;
+ createOplog(&txn);
- sethbmsg("initial sync done",0);
+ int failedAttempts = 0;
+ while ( failedAttempts < maxFailedAttempts ) {
+ try {
+ _initialSync();
+ break;
+ }
+ catch(DBException& e) {
+ failedAttempts++;
+ mongoutils::str::stream msg;
+ error() << "initial sync exception: " << e.toString() << " " <<
+ (maxFailedAttempts - failedAttempts) << " attempts remaining";
+ sleepsecs(5);
+ }
+ }
+ fassert( 16233, failedAttempts < maxFailedAttempts);
}
} // namespace repl