diff options
author | Eric Milkie <milkie@10gen.com> | 2014-09-23 14:46:55 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2014-09-26 11:27:06 -0400 |
commit | e86e08deff7293b5778fad27df9031c013595b12 (patch) | |
tree | 9c24931717b261980a0591ab40192cbac9d101ce /src/mongo | |
parent | 128ef4c4bcf312fbe6339181e377d12744165cf9 (diff) | |
download | mongo-e86e08deff7293b5778fad27df9031c013595b12.tar.gz |
SERVER-15089 Add new Applier class and remove theReplSet references from BackgroundSync
Diffstat (limited to 'src/mongo')
36 files changed, 824 insertions, 787 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 1d85caacdd1..68557c4056f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -38,7 +38,6 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/repl_coordinator_impl.h" @@ -54,8 +53,11 @@ namespace mongo { namespace repl { +namespace { + const char hashFieldName[] = "h"; int SleepToAllowBatchingMillis = 2; const int BatchIsSmallish = 40000; // bytes +} // namespace MONGO_FP_DECLARE(rsBgSyncProduce); @@ -98,12 +100,12 @@ namespace repl { } BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), - _lastOpTimeFetched(0, 0), - _lastH(0), + _lastOpTimeFetched(std::numeric_limits<int>::max(), + 0), + _lastHash(0), _pause(true), _appliedBuffer(true), _assumingPrimary(false), - _currentSyncTarget(NULL), _replCoord(getGlobalReplicationCoordinator()) { } @@ -120,14 +122,12 @@ namespace repl { } void BackgroundSync::notify() { - { - boost::unique_lock<boost::mutex> lock(s_instance->_mutex); + boost::lock_guard<boost::mutex> lock(_mutex); - // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting) - if (s_instance->_buffer.empty()) { - s_instance->_appliedBuffer = true; - s_instance->_condvar.notify_all(); - } + // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting) + if (_buffer.empty()) { + _appliedBuffer = true; + _condvar.notify_all(); } } @@ -152,8 +152,6 @@ namespace repl { } void BackgroundSync::_producerThread() { - OperationContextImpl txn; - MemberState state = theReplSet->state(); // we want to pause when the state changes to primary @@ -170,10 +168,9 @@ namespace repl { return; } - // if this member has an empty oplog, we cannot start syncing - // Note: This logic is insane, but I will keep it here because if we can't - // connect the oplogreader for initial sync, it will be unlikely that we can connect - // the BGSync oplogreader. + OperationContextImpl txn; + + // We need to wait until initial sync has started. if (_replCoord->getMyLastOptime().isNull()) { sleepsecs(1); return; @@ -181,7 +178,7 @@ namespace repl { // we want to unpause when we're no longer primary // start() also loads _lastOpTimeFetched, which we know is set from the "if" else if (_pause) { - start(); + start(&txn); } produce(&txn); @@ -190,13 +187,10 @@ namespace repl { void BackgroundSync::produce(OperationContext* txn) { // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced - OplogReader r; - { boost::unique_lock<boost::mutex> lock(_mutex); if (_lastOpTimeFetched.isNull()) { // then we're initial syncing and we're still waiting for this to be set - _currentSyncTarget = NULL; lock.unlock(); sleepsecs(1); // if there is no one to sync from @@ -214,43 +208,49 @@ namespace repl { } - // find a target to sync from the last op time written - _replCoord->connectOplogReader(txn, this, &r); - + // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; + { + boost::unique_lock<boost::mutex> lock(_mutex); + lastOpTimeFetched = _lastOpTimeFetched; + _syncSourceHost = HostAndPort(); + } + _syncSourceReader.resetConnection(); + _syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord); { boost::unique_lock<boost::mutex> lock(_mutex); // no server found - if (_currentSyncTarget == NULL) { + if (_syncSourceReader.getHost().empty()) { lock.unlock(); sleepsecs(1); // if there is no one to sync from return; } lastOpTimeFetched = _lastOpTimeFetched; + _syncSourceHost = _syncSourceReader.getHost(); } - r.tailingQueryGTE(rsoplog, lastOpTimeFetched); + _syncSourceReader.tailingQueryGTE(rsoplog, lastOpTimeFetched); // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor - if (!r.haveCursor()) { + if (!_syncSourceReader.haveCursor()) { return; } - if (_rollbackIfNeeded(txn, r)) { + if (_rollbackIfNeeded(txn, _syncSourceReader)) { stop(); return; } while (!inShutdown()) { - if (!r.moreInCurrentBatch()) { + if (!_syncSourceReader.moreInCurrentBatch()) { // Check some things periodically // (whenever we run out of items in the // current cursor batch) - int bs = r.currentBatchMessageSize(); + int bs = _syncSourceReader.currentBatchMessageSize(); if( bs > 0 && bs < BatchIsSmallish ) { // on a very low latency network, if we don't wait a little, we'll be // getting ops to write almost one at a time. this will both be expensive @@ -273,35 +273,32 @@ namespace repl { } // re-evaluate quality of sync target - if (shouldChangeSyncTarget()) { + if (shouldChangeSyncSource()) { return; } - { //record time for each getmore TimerHolder batchTimer(&getmoreReplStats); // This calls receiveMore() on the oplogreader cursor. // It can wait up to five seconds for more data. - r.more(); + _syncSourceReader.more(); } - networkByteStats.increment(r.currentBatchMessageSize()); + networkByteStats.increment(_syncSourceReader.currentBatchMessageSize()); - if (!r.moreInCurrentBatch()) { + if (!_syncSourceReader.moreInCurrentBatch()) { // If there is still no data from upstream, check a few more things // and then loop back for another pass at getting more data { boost::unique_lock<boost::mutex> lock(_mutex); - if (_pause || - !_currentSyncTarget || - !_currentSyncTarget->hbinfo().hbstate.readable()) { + if (_pause) { return; } } - r.tailCheck(); - if( !r.haveCursor() ) { + _syncSourceReader.tailCheck(); + if( !_syncSourceReader.haveCursor() ) { LOG(1) << "replSet end syncTail pass" << rsLog; return; } @@ -312,7 +309,7 @@ namespace repl { // At this point, we are guaranteed to have at least one thing to read out // of the oplogreader cursor. - BSONObj o = r.nextSafe().getOwned(); + BSONObj o = _syncSourceReader.nextSafe().getOwned(); opsReadStats.increment(); { @@ -330,7 +327,7 @@ namespace repl { { boost::unique_lock<boost::mutex> lock(_mutex); - _lastH = o["h"].numberLong(); + _lastHash = o["h"].numberLong(); _lastOpTimeFetched = o["ts"]._opTime(); LOG(3) << "replSet lastOpTimeFetched: " << _lastOpTimeFetched.toStringPretty() << rsLog; @@ -338,17 +335,15 @@ namespace repl { } } - bool BackgroundSync::shouldChangeSyncTarget() { - boost::unique_lock<boost::mutex> lock(_mutex); - + bool BackgroundSync::shouldChangeSyncSource() { // is it even still around? - if (!_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) { + if (_syncSourceReader.getHost().empty()) { return true; } - // check other members: is any member's optime more than 30 seconds ahead of the guy we're - // syncing from? - return theReplSet->shouldChangeSyncTarget(_currentSyncTarget->hbinfo().opTime); + // check other members: is any member's optime more than MaxSyncSourceLag seconds + // ahead of the current sync source? + return _replCoord->shouldChangeSyncSource(_syncSourceReader.getHost()); } @@ -371,151 +366,24 @@ namespace repl { bufferSizeGauge.decrement(getSize(op)); } - bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) { + bool BackgroundSync::isStale(OpTime lastOpTimeFetched, + OplogReader& r, + BSONObj& remoteOldestOp) { remoteOldestOp = r.findOne(rsoplog, Query()); OpTime remoteTs = remoteOldestOp["ts"]._opTime(); - DEV { - log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; - log() << "replSet lastOpTimeFetched: " << _lastOpTimeFetched.toStringLong() << rsLog; - } - { boost::unique_lock<boost::mutex> lock(_mutex); - if (_lastOpTimeFetched >= remoteTs) { + if (lastOpTimeFetched >= remoteTs) { return false; } + log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; + log() << "replSet lastOpTimeFetched: " << lastOpTimeFetched.toStringLong() << rsLog; } return true; } - void BackgroundSync::getOplogReaderLegacy(OperationContext* txn, OplogReader* r) { - const Member *target = NULL, *stale = NULL; - BSONObj oldest; - - verify(r->conn() == NULL); - - while ((target = theReplSet->getMemberToSyncTo()) != NULL) { - string current = target->fullName(); - - if (!r->connect(target->h())) { - LOG(2) << "replSet can't connect to " << current << " to read operations" << rsLog; - r->resetConnection(); - theReplSet->veto(current); - sleepsecs(1); - continue; - } - - if (isStale(*r, oldest)) { - r->resetConnection(); - theReplSet->veto(current, 600); - stale = target; - continue; - } - - // if we made it here, the target is up and not stale - { - boost::unique_lock<boost::mutex> lock(_mutex); - // this will trigger the syncSourceFeedback - _currentSyncTarget = target; - } - - return; - } - - // the only viable sync target was stale - if (stale) { - theReplSet->goStale(txn, stale, oldest); - sleepsecs(120); - } - - { - boost::unique_lock<boost::mutex> lock(_mutex); - _currentSyncTarget = NULL; - } - - } - - void BackgroundSync::connectOplogReader(OperationContext* txn, - ReplicationCoordinatorImpl* replCoordImpl, - OplogReader* reader) { - OpTime lastOpTimeFetched; - { - boost::unique_lock<boost::mutex> lock(_mutex); - lastOpTimeFetched = _lastOpTimeFetched; - } - const OpTime sentinel(Milliseconds(curTimeMillis64()).total_seconds(), 0); - OpTime oldestOpTimeSeen = sentinel; - - while (true) { - HostAndPort candidate = replCoordImpl->chooseNewSyncSource(); - - if (candidate.empty()) { - if (oldestOpTimeSeen == sentinel) { - // If, in this invocation of connectOplogReader(), we did not successfully - // connect to any node ahead of us, - // we apparently have no sync sources to connect to. - // This situation is common; e.g. if there are no writes to the primary at - // the moment. - return; - } - - // Connected to at least one member, but in all cases we were too stale to use them - // as a sync source. - log() << "replSet error RS102 too stale to catch up" << rsLog; - log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog; - log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() << - rsLog; - log() << "replSet " - "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember" - << rsLog; - sethbmsg("error RS102 too stale to catch up"); - setMinValid(txn, oldestOpTimeSeen); - replCoordImpl->setFollowerMode(MemberState::RS_RECOVERING); - return; - } - - if (!reader->connect(candidate)) { - LOG(2) << "replSet can't connect to " << candidate.toString() << - " to read operations" << rsLog; - reader->resetConnection(); - replCoordImpl->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000)); - continue; - } - // Read the first (oldest) op and confirm that it's not newer than our last - // fetched op. Otherwise, we have fallen off the back of that source's oplog. - BSONObj remoteOldestOp(reader->findOne(rsoplog, Query())); - BSONElement tsElem(remoteOldestOp["ts"]); - if (tsElem.type() != Timestamp) { - // This member's got a bad op in its oplog. - warning() << "oplog invalid format on node " << candidate.toString(); - reader->resetConnection(); - replCoordImpl->blacklistSyncSource(candidate, - Date_t(curTimeMillis64() + 600*1000)); - continue; - } - OpTime remoteOldOpTime = tsElem._opTime(); - - if (lastOpTimeFetched < remoteOldOpTime) { - // We're too stale to use this sync source. - reader->resetConnection(); - replCoordImpl->blacklistSyncSource(candidate, - Date_t(curTimeMillis64() + 600*1000)); - if (oldestOpTimeSeen > remoteOldOpTime) { - warning() << "we are too stale to use " << candidate.toString() << - " as a sync source"; - oldestOpTimeSeen = remoteOldOpTime; - } - continue; - } - - // Got a valid sync source. - return; - } // while (true) - - } - bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) { string hn = r.conn()->getServerAddress(); @@ -547,8 +415,8 @@ namespace repl { BSONObj o = r.nextSafe(); OpTime ts = o["ts"]._opTime(); - long long h = o["h"].numberLong(); - if( ts != _lastOpTimeFetched || h != _lastH ) { + long long hash = o["h"].numberLong(); + if( ts != _lastOpTimeFetched || hash != _lastHash ) { log() << "replSet our last op time fetched: " << _lastOpTimeFetched.toStringPretty() << rsLog; log() << "replset source's GTE: " << ts.toStringPretty() << rsLog; syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord); @@ -558,27 +426,29 @@ namespace repl { return false; } - const Member* BackgroundSync::getSyncTarget() { + HostAndPort BackgroundSync::getSyncTarget() { boost::unique_lock<boost::mutex> lock(_mutex); - return _currentSyncTarget; + return _syncSourceHost; } void BackgroundSync::clearSyncTarget() { boost::unique_lock<boost::mutex> lock(_mutex); - _currentSyncTarget = NULL; + _syncSourceReader.resetConnection(); + _syncSourceHost = HostAndPort(); } void BackgroundSync::stop() { boost::unique_lock<boost::mutex> lock(_mutex); _pause = true; - _currentSyncTarget = NULL; + _syncSourceReader.resetConnection(); + _syncSourceHost = HostAndPort(); _lastOpTimeFetched = OpTime(0,0); - _lastH = 0; + _lastHash = 0; _condvar.notify_all(); } - void BackgroundSync::start() { + void BackgroundSync::start(OperationContext* txn) { massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty()); boost::unique_lock<boost::mutex> lock(_mutex); @@ -586,9 +456,11 @@ namespace repl { // reset _last fields with current data _lastOpTimeFetched = _replCoord->getMyLastOptime(); - _lastH = theReplSet->lastH; - LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastH << rsLog; + loadLastHash(txn); + + LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched << + " " << _lastHash << rsLog; } bool BackgroundSync::isAssumingPrimary() { @@ -611,5 +483,56 @@ namespace repl { _assumingPrimary = false; } + long long BackgroundSync::getLastHash() const { + boost::lock_guard<boost::mutex> lck(_mutex); + return _lastHash; + } + + void BackgroundSync::setLastHash(long long newHash) { + boost::lock_guard<boost::mutex> lck(_mutex); + _lastHash = newHash; + } + + void BackgroundSync::loadLastHash(OperationContext* txn) { + Lock::DBRead lk(txn->lockState(), rsoplog); + BSONObj oplogEntry; + try { + if (!Helpers::getLast(txn, rsoplog, oplogEntry)) { + // This can happen when we are to do an initial sync. lastHash will be set + // after the initial sync is complete. + _lastHash = 0; + return; + } + } + catch (const DBException& ex) { + severe() << "Problem reading " << rsoplog << ": " << ex.toStatus(); + fassertFailed(18904); + } + BSONElement hashElement = oplogEntry[hashFieldName]; + if (hashElement.eoo()) { + severe() << "Most recent entry in " << rsoplog << " missing \"" << hashFieldName << + "\" field"; + fassertFailed(18902); + } + if (hashElement.type() != NumberLong) { + severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << + rsoplog << " entry to have type NumberLong, but found " << + typeName(hashElement.type()); + fassertFailed(18903); + } + _lastHash = hashElement.safeNumberLong(); + } + + bool BackgroundSync::getInitialSyncRequestedFlag() { + boost::lock_guard<boost::mutex> lock(_initialSyncMutex); + return _initialSyncRequestedFlag; + } + + void BackgroundSync::setInitialSyncRequestedFlag(bool value) { + boost::lock_guard<boost::mutex> lock(_initialSyncMutex); + _initialSyncRequestedFlag = value; + } + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index eb88040f64c..45f87731200 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -39,7 +39,6 @@ namespace repl { class Member; class ReplicationCoordinator; - class ReplicationCoordinatorImpl; // This interface exists to facilitate easier testing; // the test infrastructure implements these functions with stubs. @@ -56,12 +55,6 @@ namespace repl { // called by sync thread after it has applied an op virtual void consume() = 0; - // Returns the member we're currently syncing from (or NULL) - virtual const Member* getSyncTarget() = 0; - - // Sets the member we're currently syncing from to be NULL - virtual void clearSyncTarget() = 0; - // wait up to 1 second for more ops to appear virtual void waitForMore() = 0; }; @@ -79,21 +72,26 @@ namespace repl { // protects creation of s_instance static boost::mutex s_mutex; - // _mutex protects all of the class variables - boost::mutex _mutex; - // Production thread BlockingQueue<BSONObj> _buffer; + OplogReader _syncSourceReader; + + // _mutex protects all of the class variables except _syncSourceReader and _buffer + mutable boost::mutex _mutex; OpTime _lastOpTimeFetched; - long long _lastH; + + // hash we use to make sure we are reading the right flow of ops and aren't on + // an out-of-date "fork" + long long _lastHash; + // if produce thread should be running bool _pause; bool _appliedBuffer; bool _assumingPrimary; boost::condition _condvar; - const Member* _currentSyncTarget; + HostAndPort _syncSourceHost; BackgroundSync(); BackgroundSync(const BackgroundSync& s); @@ -107,23 +105,28 @@ namespace repl { bool _rollbackIfNeeded(OperationContext* txn, OplogReader& r); // Evaluate if the current sync target is still good - bool shouldChangeSyncTarget(); + bool shouldChangeSyncSource(); // check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp. - bool isStale(OplogReader& r, BSONObj& remoteOldestOp); - // stop syncing when this becomes a primary - void stop(); + bool isStale(OpTime lastOpTimeFetched, OplogReader& r, BSONObj& remoteOldestOp); // restart syncing - void start(); + void start(OperationContext* txn); // A pointer to the replication coordinator running the show. ReplicationCoordinator* _replCoord; + // bool for indicating resync need on this node and the mutex that protects it + // The resync command sets this flag; the Applier thread observes and clears it. + bool _initialSyncRequestedFlag; + boost::mutex _initialSyncMutex; + public: + // stop syncing (when this node becomes a primary, e.g.) + void stop(); bool isAssumingPrimary(); static BackgroundSync* get(); - static void shutdown(); - static void notify(); + void shutdown(); + void notify(); virtual ~BackgroundSync() {} @@ -132,11 +135,12 @@ namespace repl { // starts the sync target notifying thread void notifierThread(); + HostAndPort getSyncTarget(); + // Interface implementation virtual bool peek(BSONObj* op); virtual void consume(); - virtual const Member* getSyncTarget(); virtual void clearSyncTarget(); virtual void waitForMore(); @@ -147,18 +151,12 @@ namespace repl { // primary. void stopReplicationAndFlushBuffer(); - /** - * Connects an oplog reader to a viable sync source. Legacy uses getOplogReaderLegacy(), - * which sets _currentSyncTarget as a side effect. - * connectOplogReader() is used in new replication. - * Both functions can affect the TopoCoord's blacklist of sync sources, and may set - * our minValid value, durably, if we detect we haven fallen off the back of all sync - * sources' oplogs. - **/ - void getOplogReaderLegacy(OperationContext* txn, OplogReader* reader); - void connectOplogReader(OperationContext* txn, - ReplicationCoordinatorImpl* replCoordImpl, - OplogReader* reader); + long long getLastHash() const; + void setLastHash(long long oldH); + void loadLastHash(OperationContext* txn); + + bool getInitialSyncRequestedFlag(); + void setInitialSyncRequestedFlag(bool value); }; diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp index 76bdedd97d7..5be259383d6 100644 --- a/src/mongo/db/repl/health.cpp +++ b/src/mongo/db/repl/health.cpp @@ -448,11 +448,11 @@ namespace repl { b.append("set", name()); b.appendTimeT("date", time(0)); b.append("myState", myState.s); - const Member *syncTarget = BackgroundSync::get()->getSyncTarget(); - if ( syncTarget && + const HostAndPort syncTarget = BackgroundSync::get()->getSyncTarget(); + if ( !syncTarget.empty() && (myState != MemberState::RS_PRIMARY) && (myState != MemberState::RS_REMOVED) ) { - b.append("syncingTo", syncTarget->fullName()); + b.append("syncingTo", syncTarget.toString()); } b.append("members", v); if( replSetBlind ) diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp index 54e05e82233..b563b4b6897 100644 --- a/src/mongo/db/repl/heartbeat.cpp +++ b/src/mongo/db/repl/heartbeat.cpp @@ -213,13 +213,13 @@ namespace { if (myConfig().arbiterOnly) { return; } - + // this ensures that will have bgsync's s_instance at all points where it is needed // so that we needn't check for its existence BackgroundSync* sync = BackgroundSync::get(); boost::thread t(startSyncThread); - + boost::thread producer(stdx::bind(&BackgroundSync::producerThread, sync)); //boost::thread feedback(stdx::bind(&SyncSourceFeedback::run, // &theReplSet->syncSourceFeedback)); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index f2a9eb30a64..4287580f3d1 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -55,7 +55,6 @@ #include "mongo/db/ops/delete.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/repl/write_concern.h" #include "mongo/db/stats/counters.h" #include "mongo/db/operation_context_impl.h" @@ -126,7 +125,7 @@ namespace repl { WriteUnitOfWork wunit(txn); const OpTime ts = op["ts"]._opTime(); - long long h = op["h"].numberLong(); + long long hash = op["h"].numberLong(); { if ( localOplogRSCollection == 0 ) { @@ -142,29 +141,20 @@ namespace repl { Client::Context ctx(txn, rsoplog, localDB); checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); - /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy. - this code (or code in now() maybe) should be improved. - */ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) { - OpTime myLastOptime = replCoord->getMyLastOptime(); - if (!(myLastOptime < ts)) { - warning() << "replication oplog stream went back in time. previous timestamp: " - << myLastOptime << " newest timestamp: " << ts - << ". attempting to sync directly from primary." << endl; - BSONObjBuilder result; - HostAndPort targetHostAndPort = theReplSet->box.getPrimary()->h(); - Status status = replCoord->processReplSetSyncFrom(targetHostAndPort, &result); - if (!status.isOK()) { - error() << "Can't sync from primary: " << status; - } - } - theReplSet->lastH = h; - ctx.getClient()->setLastOp( ts ); - - replCoord->setMyLastOptime(txn, ts); - BackgroundSync::notify(); + OpTime myLastOptime = replCoord->getMyLastOptime(); + if (!(myLastOptime < ts)) { + severe() << "replication oplog stream went back in time. previous timestamp: " + << myLastOptime << " newest timestamp: " << ts; + fassertFailedNoTrace(18905); } + + BackgroundSync* bgsync = BackgroundSync::get(); + bgsync->setLastHash(hash); + ctx.getClient()->setLastOp( ts ); + + replCoord->setMyLastOptime(txn, ts); + bgsync->notify(); } setNewOptime(ts); @@ -252,26 +242,33 @@ namespace repl { resetSlaveCache(); return; } + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); mutex::scoped_lock lk2(newOpMutex); OpTime ts(getNextGlobalOptime()); newOptimeNotifier.notify_all(); - long long hashNew; - if( theReplSet ) { - if (!theReplSet->box.getState().primary()) { - log() << "replSet error : logOp() but not primary"; - fassertFailed(17405); - } - hashNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId(); + long long hashNew = BackgroundSync::get()->getLastHash(); + + // Check to make sure logOp() is legal at this point. + if (*opstr == 'n') { + // 'n' operations are always logged + invariant(*ns == '\0'); + + // 'n' operations do not advance the hash, since they are not rolled back } else { - // must be initiation - verify( *ns == 0 ); - hashNew = 0; + if (!replCoord->canAcceptWritesForDatabase(nsToDatabaseSubstring(ns))) { + severe() << "replSet error : logOp() but can't accept write to collection " << ns; + fassertFailed(17405); + } + + // Advance the hash + hashNew = (hashNew * 131 + ts.asLL()) * 17 + replCoord->getMyId(); } + /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- instead we do a single copy to the destination position in the memory mapped file. */ @@ -305,12 +302,10 @@ namespace repl { OplogDocWriter writer( partial, obj ); checkOplogInsert( localOplogRSCollection->insertDocument( txn, &writer, false ) ); - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { - theReplSet->lastH = hashNew; - ctx.getClient()->setLastOp( ts ); - replCoord->setMyLastOptime(txn, ts); - } + BackgroundSync::get()->setLastHash(hashNew); + ctx.getClient()->setLastOp( ts ); + replCoord->setMyLastOptime(txn, ts); + wunit.commit(); } diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 23997eabeb3..eb542669ccf 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -40,11 +40,14 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/auth/security_key.h" +#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" -#include "mongo/db/repl/rs.h" // theReplSet +#include "mongo/db/repl/minvalid.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_coordinator.h" +#include "mongo/db/repl/rslog.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -135,5 +138,79 @@ namespace repl { return _host; } + void OplogReader::connectToSyncSource(OperationContext* txn, + OpTime lastOpTimeFetched, + ReplicationCoordinator* replCoord) { + const OpTime sentinel(Milliseconds(curTimeMillis64()).total_seconds(), 0); + OpTime oldestOpTimeSeen = sentinel; + + invariant(conn() == NULL); + + while (true) { + HostAndPort candidate = replCoord->chooseNewSyncSource(); + + if (candidate.empty()) { + if (oldestOpTimeSeen == sentinel) { + // If, in this invocation of connectToSyncSource(), we did not successfully + // connect to any node ahead of us, + // we apparently have no sync sources to connect to. + // This situation is common; e.g. if there are no writes to the primary at + // the moment. + return; + } + + // Connected to at least one member, but in all cases we were too stale to use them + // as a sync source. + log() << "replSet error RS102 too stale to catch up" << rsLog; + log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog; + log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() << + rsLog; + log() << "replSet " + "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember" + << rsLog; + setMinValid(txn, oldestOpTimeSeen); + replCoord->setFollowerMode(MemberState::RS_RECOVERING); + return; + } + + if (!connect(candidate)) { + LOG(2) << "replSet can't connect to " << candidate.toString() << + " to read operations" << rsLog; + resetConnection(); + replCoord->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000)); + continue; + } + // Read the first (oldest) op and confirm that it's not newer than our last + // fetched op. Otherwise, we have fallen off the back of that source's oplog. + BSONObj remoteOldestOp(findOne(rsoplog, Query())); + BSONElement tsElem(remoteOldestOp["ts"]); + if (tsElem.type() != Timestamp) { + // This member's got a bad op in its oplog. + warning() << "oplog invalid format on node " << candidate.toString(); + resetConnection(); + replCoord->blacklistSyncSource(candidate, + Date_t(curTimeMillis64() + 600*1000)); + continue; + } + OpTime remoteOldOpTime = tsElem._opTime(); + + if (lastOpTimeFetched < remoteOldOpTime) { + // We're too stale to use this sync source. + resetConnection(); + replCoord->blacklistSyncSource(candidate, + Date_t(curTimeMillis64() + 600*1000)); + if (oldestOpTimeSeen > remoteOldOpTime) { + warning() << "we are too stale to use " << candidate.toString() << + " as a sync source"; + oldestOpTimeSeen = remoteOldOpTime; + } + continue; + } + + // Got a valid sync source. + return; + } // while (true) + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 884276035ee..f4434e596d7 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -40,6 +40,8 @@ namespace mongo { extern const BSONObj reverseNaturalObj; // { $natural : -1 } namespace repl { + class ReplicationCoordinator; + /** * Authenticates conn using the server's cluster-membership credentials. * @@ -131,6 +133,19 @@ namespace repl { void putBack(BSONObj op) { cursor->putBack(op); } HostAndPort getHost() const; + + /** + * Connects this OplogReader to a valid sync source, using the provided lastOpTimeFetched + * and ReplicationCoordinator objects. + * If this function fails to connect to a sync source that is viable, this OplogReader + * is left unconnected, where this->conn() equals NULL. + * In the process of connecting, this function may add items to the repl coordinator's + * sync source blacklist. + * This function may throw DB exceptions. + */ + void connectToSyncSource(OperationContext* txn, + OpTime lastOpTimeFetched, + ReplicationCoordinator* replCoord); }; } // namespace repl diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index 970bf646bba..46fdaaf8a51 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -272,6 +272,11 @@ namespace repl { virtual OID getMyRID() const = 0; /** + * Returns the id for this node as specified in the current replica set configuration. + */ + virtual int getMyId() const = 0; + + /** * Sets this node into a specific follower mode. * * It is an error to call this method if the node's topology coordinator would not @@ -495,14 +500,26 @@ namespace repl { virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) = 0; /** - * Connects an oplog reader to a viable sync source, using BackgroundSync object bgsync. - * When this function returns, reader is connected to a viable sync source or is left - * unconnected if no sync sources are currently available. In legacy, bgsync's - * _currentSyncTarget is also set appropriately. - **/ - virtual void connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* reader) = 0; + * Chooses a viable sync source, or, if none available, returns empty HostAndPort. + */ + virtual HostAndPort chooseNewSyncSource() = 0; + + /** + * Blacklists choosing 'host' as a sync source until time 'until'. + */ + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0; + + /** + * Loads the optime from the last op in the oplog into the coordinator's lastOpApplied + * value. + */ + virtual void resetLastOpTimeFromOplog(OperationContext* txn) = 0; + + /** + * Determines if a new sync source should be considered. + * currentSource: the current sync source + */ + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0; protected: diff --git a/src/mongo/db/repl/repl_coordinator_external_state.h b/src/mongo/db/repl/repl_coordinator_external_state.h index 73721cd5dd9..2b2fa45acd4 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state.h +++ b/src/mongo/db/repl/repl_coordinator_external_state.h @@ -58,16 +58,6 @@ namespace repl { class GlobalSharedLockAcquirer; class ScopedLocker; - /** - * Structure used to pass around information about oplog entry optimes and h values. - */ - struct OpTimeAndHash { - OpTimeAndHash() {} - OpTimeAndHash(OpTime ot, long long h) : opTime(ot), hash(h) {} - OpTime opTime; - long long hash; - }; - ReplicationCoordinatorExternalState(); virtual ~ReplicationCoordinatorExternalState(); @@ -121,10 +111,10 @@ namespace repl { virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config) = 0; /** - * Gets the last optime and hash of an operation performed on this host, from stable + * Gets the last optime of an operation performed on this host, from stable * storage. */ - virtual StatusWith<OpTimeAndHash> loadLastOpTimeAndHash(OperationContext* txn) = 0; + virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn) = 0; /** * Returns the HostAndPort of the remote client connected to us that initiated the operation diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp index dd78ae482b4..9ec360369ab 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/connections.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/oplog.h" @@ -59,7 +60,6 @@ namespace { const char meCollectionName[] = "local.me"; const char meDatabaseName[] = "local"; const char tsFieldName[] = "ts"; - const char hashFieldName[] = "h"; } // namespace ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() {} @@ -71,6 +71,11 @@ namespace { void ReplicationCoordinatorExternalStateImpl::shutdown() { _syncSourceFeedback.shutdown(); + BackgroundSync* bgsync = BackgroundSync::get(); + // bgsync can be null if we shut down prior to installing our initial replset config. + if (bgsync) { + bgsync->shutdown(); + } } void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake() { @@ -141,37 +146,35 @@ namespace { } } - StatusWith<ReplicationCoordinatorExternalState::OpTimeAndHash> - ReplicationCoordinatorExternalStateImpl::loadLastOpTimeAndHash( + StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( OperationContext* txn) { try { Lock::DBRead lk(txn->lockState(), rsoplog); BSONObj oplogEntry; if (!Helpers::getLast(txn, rsoplog, oplogEntry)) { - return StatusWith<OpTimeAndHash>( + return StatusWith<OpTime>( ErrorCodes::NoMatchingDocument, str::stream() << "Did not find any entries in " << rsoplog); } BSONElement tsElement = oplogEntry[tsFieldName]; if (tsElement.eoo()) { - return StatusWith<OpTimeAndHash>( + return StatusWith<OpTime>( ErrorCodes::NoSuchKey, str::stream() << "Most recent entry in " << rsoplog << " missing \"" << tsFieldName << "\" field"); } if (tsElement.type() != Timestamp) { - return StatusWith<OpTimeAndHash>( + return StatusWith<OpTime>( ErrorCodes::TypeMismatch, str::stream() << "Expected type of \"" << tsFieldName << "\" in most recent " << rsoplog << " entry to have type Timestamp, but found " << typeName(tsElement.type())); } - return StatusWith<OpTimeAndHash>( - OpTimeAndHash(tsElement._opTime(), oplogEntry[hashFieldName].safeNumberLong())); + return StatusWith<OpTime>(tsElement._opTime()); } catch (const DBException& ex) { - return StatusWith<OpTimeAndHash>(ex.toStatus()); + return StatusWith<OpTime>(ex.toStatus()); } } diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.h b/src/mongo/db/repl/repl_coordinator_external_state_impl.h index 67740b87735..01ebc44a124 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.h @@ -50,7 +50,7 @@ namespace repl { virtual bool isSelf(const HostAndPort& host); virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config); - virtual StatusWith<OpTimeAndHash> loadLastOpTimeAndHash(OperationContext* txn); + virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); virtual void closeClientConnections(); virtual ReplicationCoordinatorExternalState::GlobalSharedLockAcquirer* diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp index 85b7f82fdd5..0465ae5cc2a 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp @@ -43,7 +43,7 @@ namespace repl { ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() : _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"), - _lastOpTimeAndHash(ErrorCodes::NoMatchingDocument, "No last oplog entry"), + _lastOpTime(ErrorCodes::NoMatchingDocument, "No last oplog entry"), _canAcquireGlobalSharedLock(true), _connectionsClosed(false) { } @@ -95,16 +95,14 @@ namespace repl { _localRsConfigDocument = localConfigDocument; } - StatusWith<ReplicationCoordinatorExternalState::OpTimeAndHash> - ReplicationCoordinatorExternalStateMock::loadLastOpTimeAndHash( - OperationContext* txn) { - return _lastOpTimeAndHash; + StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime( + OperationContext* txn) { + return _lastOpTime; } - void ReplicationCoordinatorExternalStateMock::setLastOpTimeAndHash( - const StatusWith<OpTimeAndHash>& lastApplied) { - - _lastOpTimeAndHash = lastApplied; + void ReplicationCoordinatorExternalStateMock::setLastOpTime( + const StatusWith<OpTime>& lastApplied) { + _lastOpTime = lastApplied; } void ReplicationCoordinatorExternalStateMock::closeClientConnections() { diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.h b/src/mongo/db/repl/repl_coordinator_external_state_mock.h index 155ec979000..d10fcc075a2 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.h @@ -57,7 +57,7 @@ namespace repl { virtual HostAndPort getClientHostAndPort(const OperationContext* txn); virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config); - virtual StatusWith<OpTimeAndHash> loadLastOpTimeAndHash(OperationContext* txn); + virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn); virtual void closeClientConnections(); virtual ReplicationCoordinatorExternalState::GlobalSharedLockAcquirer* getGlobalSharedLockAcquirer(); @@ -88,11 +88,11 @@ namespace repl { /** * Sets the return value for subsequent calls to loadLastOpTimeApplied. */ - void setLastOpTimeAndHash(const StatusWith<OpTimeAndHash>& lastApplied); + void setLastOpTime(const StatusWith<OpTime>& lastApplied); private: StatusWith<BSONObj> _localRsConfigDocument; - StatusWith<OpTimeAndHash> _lastOpTimeAndHash; + StatusWith<OpTime> _lastOpTime; std::vector<HostAndPort> _selfHosts; bool _canAcquireGlobalSharedLock; bool _connectionsClosed; diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index c9555d0a432..253848c92ff 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -190,19 +190,26 @@ namespace repl { Status HybridReplicationCoordinator::setMyLastOptime(OperationContext* txn, const OpTime& ts) { Status legacyStatus = _legacy.setMyLastOptime(txn, ts); - Status implStatus = _impl.setMyLastOptime(txn, ts); - if (legacyStatus.code() != implStatus.code()) { - warning() << "Hybrid response difference in setMyLastOptime. Legacy response: " - << legacyStatus << ", impl response: " << implStatus; + // Currently, the legacy code calls logOp before we have a config, which + // means we can't set the last optime in the impl because we have no place to store it. + if (getReplicationMode() == ReplicationCoordinator::modeReplSet || + getReplicationMode() == ReplicationCoordinator::modeMasterSlave) { + Status implStatus = _impl.setMyLastOptime(txn, ts); + if (legacyStatus.code() != implStatus.code()) { + warning() << "Hybrid response difference in setMyLastOptime. Legacy response: " + << legacyStatus << ", impl response: " << implStatus; + } + fassert(18666, legacyStatus.code() == implStatus.code()); } - fassert(18666, legacyStatus.code() == implStatus.code()); return legacyStatus; } OpTime HybridReplicationCoordinator::getMyLastOptime() const { - _legacy.getMyLastOptime(); - OpTime implOpTime = _impl.getMyLastOptime(); - return implOpTime; + OpTime legacyOpTime = _legacy.getMyLastOptime(); + _impl.getMyLastOptime(); + // Returning the legacy one for now, because at startup we can only set the legacy and not + // the impl (see comment in setMyLastOptime() above). + return legacyOpTime; } OID HybridReplicationCoordinator::getElectionId() { @@ -217,6 +224,10 @@ namespace repl { return legacyRID; } + int HybridReplicationCoordinator::getMyId() const { + return _impl.getMyId(); + } + void HybridReplicationCoordinator::setFollowerMode(const MemberState& newState) { _legacy.setFollowerMode(newState); _impl.setFollowerMode(newState); @@ -430,18 +441,25 @@ namespace repl { return legacyResponse; } - void HybridReplicationCoordinator::connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r) { - _legacy.connectOplogReader(txn, bgsync, r); - HostAndPort legacySyncSource = r->getHost(); - bgsync->connectOplogReader(txn, &_impl, r); - HostAndPort implSyncSource = r->getHost(); - if (legacySyncSource != implSyncSource) { - severe() << "sync source mismatch between legacy and impl: " << - legacySyncSource.toString() << " and " << implSyncSource.toString(); - fassertFailed(18742); - } + HostAndPort HybridReplicationCoordinator::chooseNewSyncSource() { + return _legacy.chooseNewSyncSource(); + //return _impl.chooseNewSyncSource(); + } + + void HybridReplicationCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) { + _legacy.blacklistSyncSource(host, until); + _impl.blacklistSyncSource(host, until); + } + + void HybridReplicationCoordinator::resetLastOpTimeFromOplog(OperationContext* txn) { + _impl.resetLastOpTimeFromOplog(txn); + _legacy.resetLastOpTimeFromOplog(txn); + } + + bool HybridReplicationCoordinator::shouldChangeSyncSource(const HostAndPort& currentSource) { + // Doesn't yet return the correct answer because we have no heartbeats. + _impl.shouldChangeSyncSource(currentSource); + return _legacy.shouldChangeSyncSource(currentSource); } void HybridReplicationCoordinator::setImplConfigHack(const ReplSetConfig* config) { diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h index d1b1c41e54b..ed33881db34 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.h +++ b/src/mongo/db/repl/repl_coordinator_hybrid.h @@ -102,6 +102,8 @@ namespace repl { virtual OID getMyRID() const; + virtual int getMyId() const; + virtual void setFollowerMode(const MemberState& newState); virtual bool isWaitingForApplierToDrain(); @@ -169,9 +171,13 @@ namespace repl { virtual bool isReplEnabled() const; - virtual void connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r); + virtual HostAndPort chooseNewSyncSource(); + + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + + virtual void resetLastOpTimeFromOplog(OperationContext* txn); + + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); /** * This is a temporary hack to force _impl to set its replset config to the one loaded by diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 77be9c92a2c..57ef45b0d2b 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -162,15 +162,14 @@ namespace { return true; } - StatusWith<ReplicationCoordinatorExternalState::OpTimeAndHash> lastOpTimeStatus = - _externalState->loadLastOpTimeAndHash(txn); + StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); OpTime lastOpTime(0, 0); if (!lastOpTimeStatus.isOK()) { warning() << "Failed to load timestamp of most recently applied operation; " << lastOpTimeStatus.getStatus(); } else { - lastOpTime = lastOpTimeStatus.getValue().opTime; + lastOpTime = lastOpTimeStatus.getValue(); } // Use a callback here, because _finishLoadLocalConfig calls isself() which requires @@ -871,6 +870,16 @@ namespace { return _myRID; } + int ReplicationCoordinatorImpl::getMyId() const { + boost::lock_guard<boost::mutex> lock(_mutex); + return _getMyId_inlock(); + } + + int ReplicationCoordinatorImpl::_getMyId_inlock() const { + const MemberConfig& self = _rsConfig.getMemberAt(_thisMembersConfigIndex); + return self.getId(); + } + void ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand( OperationContext* txn, BSONObjBuilder* cmdBuilder) { @@ -1503,12 +1512,6 @@ namespace { return _settings.usingReplSets() || _settings.master || _settings.slave; } - void ReplicationCoordinatorImpl::connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r) { - invariant(false); - } - void ReplicationCoordinatorImpl::_chooseNewSyncSource( const ReplicationExecutor::CallbackData& cbData, HostAndPort* newSyncSource) { @@ -1557,5 +1560,45 @@ namespace { _replExecutor.wait(cbh.getValue()); } + void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) { + StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); + OpTime lastOpTime(0, 0); + if (!lastOpTimeStatus.isOK()) { + warning() << "Failed to load timestamp of most recently applied operation; " << + lastOpTimeStatus.getStatus(); + } + else { + lastOpTime = lastOpTimeStatus.getValue(); + } + boost::unique_lock<boost::mutex> lk(_mutex); + _setLastOptime_inlock(&lk, _getMyRID_inlock(), lastOpTime); + } + + void ReplicationCoordinatorImpl::_shouldChangeSyncSource( + const ReplicationExecutor::CallbackData& cbData, + const HostAndPort& currentSource, + bool* shouldChange) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + *shouldChange = _topCoord->shouldChangeSyncSource(currentSource); + } + + bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) { + bool shouldChange(false); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource, + this, + stdx::placeholders::_1, + currentSource, + &shouldChange)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; + } + fassert(18906, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return shouldChange; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index 870c79ebbba..f2fba19bb40 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -136,6 +136,8 @@ namespace repl { virtual OID getMyRID() const; + virtual int getMyId() const; + virtual void setFollowerMode(const MemberState& newState); virtual bool isWaitingForApplierToDrain(); @@ -203,11 +205,14 @@ namespace repl { virtual bool isReplEnabled() const; - virtual void connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r); + virtual HostAndPort chooseNewSyncSource(); + + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + + virtual void resetLastOpTimeFromOplog(OperationContext* txn); + + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); - // ================== Members of replication code internal API =================== // This is a temporary hack to set the replset config to the config detected by the @@ -231,21 +236,6 @@ namespace repl { */ void cancelHeartbeats(); - /** - * Chooses a sync source. - * A wrapper that schedules _chooseNewSyncSource() through the Replication Executor and - * waits for its completion. - */ - HostAndPort chooseNewSyncSource(); - - /** - * Blacklists 'host' until 'until'. - * A wrapper that schedules _blacklistSyncSource() through the Replication Executor and - * waits for its completion. - */ - void blacklistSyncSource(const HostAndPort& host, Date_t until); - - // ================== Test support API =================== /** @@ -408,6 +398,8 @@ namespace repl { OID _getMyRID_inlock() const; + int _getMyId_inlock() const; + /** * Bottom half of setFollowerMode. */ @@ -532,7 +524,14 @@ namespace repl { void _blacklistSyncSource(const ReplicationExecutor::CallbackData& cbData, const HostAndPort& host, Date_t until); - + /** + * Determines if a new sync source should be considered. + * + * Must be scheduled as a callback. + */ + void _shouldChangeSyncSource(const ReplicationExecutor::CallbackData& cbData, + const HostAndPort& currentSource, + bool* shouldChange); // // All member variables are labeled with one of the following codes indicating the diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index 48f0797a483..8df815c60ed 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -99,9 +99,6 @@ namespace repl { } void LegacyReplicationCoordinator::shutdown() { - if (getReplicationMode() == modeReplSet) { - theReplSet->shutdown(); - } } ReplSettings& LegacyReplicationCoordinator::getSettings() { @@ -424,7 +421,9 @@ namespace { boost::lock_guard<boost::mutex> lock(_mutex); SlaveOpTimeMap::const_iterator it(_slaveOpTimeMap.find(_myRID)); - invariant(it != _slaveOpTimeMap.end()); + if (it == _slaveOpTimeMap.end()) { + return OpTime(0,0); + } OpTime legacyMapOpTime = it->second; OpTime legacyOpTime = theReplSet->lastOpTimeWritten; // TODO(emilkie): SERVER-15209 @@ -444,6 +443,11 @@ namespace { return _myRID; } + int LegacyReplicationCoordinator::getMyId() const { + invariant(false); + return 0; + } + void LegacyReplicationCoordinator::setFollowerMode(const MemberState& newState) { theReplSet->changeState(newState); } @@ -600,9 +604,9 @@ namespace { response->setHbMsg(theReplSet->hbmsg()); response->setTime(Seconds(time(0))); response->setOpTime(theReplSet->lastOpTimeWritten.asDate()); - const Member *syncTarget = BackgroundSync::get()->getSyncTarget(); - if (syncTarget) { - response->setSyncingTo(syncTarget->fullName()); + const HostAndPort syncTarget = BackgroundSync::get()->getSyncTarget(); + if (!syncTarget.empty()) { + response->setSyncingTo(syncTarget.toString()); } int v = theReplSet->config().version; @@ -1028,12 +1032,27 @@ namespace { return _settings.usingReplSets() || _settings.slave || _settings.master; } - void LegacyReplicationCoordinator::connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r) { - bgsync->getOplogReaderLegacy(txn, r); + HostAndPort LegacyReplicationCoordinator::chooseNewSyncSource() { + const Member* member = theReplSet->getMemberToSyncTo(); + if (member) { + return member->h(); + } + else { + return HostAndPort(); + } + } + + void LegacyReplicationCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) { + theReplSet->veto(host.toString(), until); } + void LegacyReplicationCoordinator::resetLastOpTimeFromOplog(OperationContext* txn) { + theReplSet->loadLastOpTimeWritten(txn, false); + } + + bool LegacyReplicationCoordinator::shouldChangeSyncSource(const HostAndPort& currentSource) { + return theReplSet->shouldChangeSyncTarget(currentSource); + } } // namespace repl diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h index 8d724ba469c..3d354c60ef5 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ b/src/mongo/db/repl/repl_coordinator_legacy.h @@ -98,6 +98,8 @@ namespace repl { virtual OID getMyRID() const; + virtual int getMyId() const; + virtual void setFollowerMode(const MemberState& newState); virtual bool isWaitingForApplierToDrain(); @@ -165,9 +167,13 @@ namespace repl { virtual bool isReplEnabled() const; - virtual void connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r); + virtual HostAndPort chooseNewSyncSource(); + + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + + virtual void resetLastOpTimeFromOplog(OperationContext* txn); + + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); private: diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp index 686ca9cebad..51353d897ed 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_mock.cpp @@ -141,6 +141,10 @@ namespace repl { return OID(); } + int ReplicationCoordinatorMock::getMyId() const { + return 0; + } + void ReplicationCoordinatorMock::setFollowerMode(const MemberState& newState) { } @@ -262,9 +266,20 @@ namespace repl { return Status::OK(); } - void ReplicationCoordinatorMock::connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r) { + HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource() { + invariant(false); + return HostAndPort(); + } + + void ReplicationCoordinatorMock::blacklistSyncSource(const HostAndPort& host, Date_t until) { + invariant(false); + } + + void ReplicationCoordinatorMock::resetLastOpTimeFromOplog(OperationContext* txn) { + invariant(false); + } + + bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource) { invariant(false); } diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h index e7c991a734e..63f7680ede2 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.h +++ b/src/mongo/db/repl/repl_coordinator_mock.h @@ -99,6 +99,8 @@ namespace repl { virtual OID getMyRID() const; + virtual int getMyId() const; + virtual void setFollowerMode(const MemberState& newState); virtual bool isWaitingForApplierToDrain(); @@ -164,9 +166,13 @@ namespace repl { virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); - virtual void connectOplogReader(OperationContext* txn, - BackgroundSync* bgsync, - OplogReader* r); + virtual HostAndPort chooseNewSyncSource(); + + virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); + + virtual void resetLastOpTimeFromOplog(OperationContext* txn); + + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource); private: diff --git a/src/mongo/db/repl/repl_set.h b/src/mongo/db/repl/repl_set.h index bb70c577627..f53a2fdcfbe 100644 --- a/src/mongo/db/repl/repl_set.h +++ b/src/mongo/db/repl/repl_set.h @@ -55,7 +55,6 @@ namespace repl { /* call after constructing to start - returns fairly quickly after launching its threads */ void go() { _go(); } - void shutdown(); virtual bool isPrimary() { return box.getState().primary(); } virtual bool isSecondary() { return box.getState().secondary(); } diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index ec066932801..ee6e5f240d0 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -385,7 +385,6 @@ namespace { _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); strcpy(_hbmsg , "initial startup"); - lastH = 0; changeState(MemberState::RS_STARTUP); _seeds = &replSetSeedList.seeds; @@ -437,7 +436,6 @@ namespace { _self(0), _maintenanceMode(0), mgr(0), - initialSyncRequested(false), // only used for resync _indexPrefetchConfig(PREFETCH_ALL) { } @@ -445,7 +443,6 @@ namespace { Lock::DBRead lk(txn->lockState(), rsoplog); BSONObj o; if (Helpers::getLast(txn, rsoplog, o)) { - lastH = o["h"].numberLong(); OpTime lastOpTime = o["ts"]._opTime(); uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTime.isNull()); getGlobalReplicationCoordinator()->setMyLastOptime(txn, lastOpTime); @@ -469,6 +466,8 @@ namespace { OperationContextImpl txn; try { + // Note: this sets lastOpTimeWritten, which the Applier uses to determine whether to + // do an initial sync or not. loadLastOpTimeWritten(&txn); } catch (std::exception& e) { @@ -494,6 +493,7 @@ namespace { } } + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_STARTUP2); startThreads(); newReplUp(); // oplog.cpp @@ -877,6 +877,125 @@ namespace { startupStatusMsg.set("? started"); startupStatus = STARTED; } + const Member* ReplSetImpl::getMemberToSyncTo() { + lock lk(this); + + // if we have a target we've requested to sync from, use it + + if (_forceSyncTarget) { + Member* target = _forceSyncTarget; + _forceSyncTarget = 0; + sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0); + return target; + } + + const Member* primary = box.getPrimary(); + + // wait for 2N pings before choosing a sync target + if (_cfg) { + int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; + + if (needMorePings > 0) { + OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; + return NULL; + } + + // If we are only allowed to sync from the primary, return that + if (!_cfg->chainingAllowed()) { + // Returns NULL if we cannot reach the primary + return primary; + } + } + + // find the member with the lowest ping time that has more data than me + + // Find primary's oplog time. Reject sync candidates that are more than + // maxSyncSourceLagSecs seconds behind. + OpTime primaryOpTime; + if (primary) + primaryOpTime = primary->hbinfo().opTime; + else + // choose a time that will exclude no candidates, since we don't see a primary + primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); + + if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) { + // erh - I think this means there was just a new election + // and we don't yet know the new primary's optime + primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); + } + + OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0); + + Member *closest = 0; + time_t now = 0; + + // Make two attempts. The first attempt, we ignore those nodes with + // slave delay higher than our own. The second attempt includes such + // nodes, in case those are the only ones we can reach. + // This loop attempts to set 'closest'. + for (int attempts = 0; attempts < 2; ++attempts) { + for (Member *m = _members.head(); m; m = m->next()) { + if (!m->syncable()) + continue; + + if (m->state() == MemberState::RS_SECONDARY) { + // only consider secondaries that are ahead of where we are + if (m->hbinfo().opTime <= lastOpTimeWritten) + continue; + // omit secondaries that are excessively behind, on the first attempt at least. + if (attempts == 0 && + m->hbinfo().opTime < oldestSyncOpTime) + continue; + } + + // omit nodes that are more latent than anything we've already considered + if (closest && + (m->hbinfo().ping > closest->hbinfo().ping)) + continue; + + if (attempts == 0 && + (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) { + continue; // skip this one in the first attempt + } + + map<string,time_t>::iterator vetoed = _veto.find(m->fullName()); + if (vetoed != _veto.end()) { + // Do some veto housekeeping + if (now == 0) { + now = time(0); + } + + // if this was on the veto list, check if it was vetoed in the last "while". + // if it was, skip. + if (vetoed->second >= now) { + if (time(0) % 5 == 0) { + log() << "replSet not trying to sync from " << (*vetoed).first + << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog; + } + continue; + } + _veto.erase(vetoed); + // fall through, this is a valid candidate now + } + // This candidate has passed all tests; set 'closest' + closest = m; + } + if (closest) break; // no need for second attempt + } + + if (!closest) { + return NULL; + } + + sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); + + return closest; + } + + void ReplSetImpl::veto(const string& host, const Date_t until) { + lock lk(this); + _veto[host] = until.toTimeT(); + } } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h index f6ccc591c9d..5664f6b1638 100644 --- a/src/mongo/db/repl/repl_set_impl.h +++ b/src/mongo/db/repl/repl_set_impl.h @@ -90,19 +90,16 @@ namespace repl { OpTime lastOpTimeWritten; OpTime getEarliestOpTimeWritten() const; - // hash we use to make sure we are reading the right flow of ops and aren't on - // an out-of-date "fork" - long long lastH; Status forceSyncFrom(const string& host, BSONObjBuilder* result); // Check if the current sync target is suboptimal. This must be called while holding a mutex // that prevents the sync source from changing. - bool shouldChangeSyncTarget(const OpTime& target) const; + bool shouldChangeSyncTarget(const HostAndPort& target) const; /** * Find the closest member (using ping time) with a higher latest optime. */ const Member* getMemberToSyncTo(); - void veto(const string& host, unsigned secs=10); + void veto(const string& host, Date_t until); bool gotForceSync(); void goStale(OperationContext* txn, const Member* m, const BSONObj& o); @@ -292,15 +289,14 @@ namespace repl { OplogReader* r, const Member* source); void _initialSync(); - void syncDoInitialSync(); void _syncThread(); void syncTail(); void syncFixUp(OperationContext* txn, FixUpInfo& h, OplogReader& r); + public: // keep a list of hosts that we've tried recently that didn't work map<string,time_t> _veto; - public: // Allow index prefetching to be turned on/off enum IndexPrefetchConfig { PREFETCH_NONE=0, PREFETCH_ID_ONLY=1, PREFETCH_ALL=2 @@ -312,10 +308,8 @@ namespace repl { IndexPrefetchConfig getIndexPrefetchConfig() { return _indexPrefetchConfig; } - const ReplSetConfig::MemberCfg& myConfig() const { return _config; } - void tryToGoLiveAsASecondary(OperationContext* txn); void syncThread(); const OpTime lastOtherOpTime() const; /** @@ -323,16 +317,9 @@ namespace repl { */ const OpTime lastOtherElectableOpTime() const; - // bool for indicating resync need on this node and the mutex that protects it - bool initialSyncRequested; - boost::mutex initialSyncMutex; - BSONObj getLastErrorDefault; private: IndexPrefetchConfig _indexPrefetchConfig; - - static const char* _initialSyncFlagString; - static const BSONObj _initialSyncFlag; }; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp index 440996e95e0..026814ee39e 100644 --- a/src/mongo/db/repl/rs.cpp +++ b/src/mongo/db/repl/rs.cpp @@ -135,10 +135,6 @@ namespace repl { cc().shutdown(); } - void ReplSet::shutdown() { - BackgroundSync::shutdown(); - } - void replLocalAuth() { cc().getAuthorizationSession()->grantInternalAuthorization(); } diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp index 9017d7f715c..52637f3c9be 100644 --- a/src/mongo/db/repl/rs_config.cpp +++ b/src/mongo/db/repl/rs_config.cpp @@ -90,8 +90,6 @@ namespace { { Client::WriteContext cx(txn, rsConfigNs); - //theReplSet->lastOpTimeWritten = ??; - //rather than above, do a logOp()? probably Helpers::putSingletonGod(txn, rsConfigNs.c_str(), newConfigBSON, diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index a4d8f77045a..5709f6eb98c 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/rs.h" +#include "mongo/db/repl/rs_initialsync.h" #include "mongo/bson/optime.h" #include "mongo/db/auth/authorization_manager.h" @@ -42,7 +42,6 @@ #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/initial_sync.h" -#include "mongo/db/repl/member.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" @@ -52,50 +51,14 @@ #include "mongo/util/mongoutils/str.h" namespace mongo { - namespace repl { +namespace { - using namespace mongoutils; - - // add try/catch with sleep - - void isyncassert(const string& msg, bool expr) { - if( !expr ) { - string m = str::stream() << "initial sync " << msg; - theReplSet->sethbmsg(m, 0); - uasserted(13404, m); - } - } - - void ReplSetImpl::syncDoInitialSync() { - static const int maxFailedAttempts = 10; - - OperationContextImpl txn; - createOplog(&txn); - - int failedAttempts = 0; - while ( failedAttempts < maxFailedAttempts ) { - try { - _initialSync(); - break; - } - catch(DBException& e) { - failedAttempts++; - str::stream msg; - msg << "initial sync exception: "; - msg << e.toString() << " " << (maxFailedAttempts - failedAttempts) << " attempts remaining" ; - sethbmsg(msg, 0); - sleepsecs(30); - } - } - fassert( 16233, failedAttempts < maxFailedAttempts); - } - - bool ReplSetImpl::_initialSyncClone(OperationContext* txn, - Cloner& cloner, - const std::string& host, - const list<string>& dbs, - bool dataPass) { + bool _initialSyncClone(OperationContext* txn, + Cloner& cloner, + const std::string& host, + const list<string>& dbs, + bool dataPass) { for( list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++ ) { const string db = *i; @@ -103,9 +66,9 @@ namespace repl { continue; if ( dataPass ) - sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + log() << "initial sync cloning db: " << db; else - sethbmsg( str::stream() << "initial sync cloning indexes for : " << db , 0); + log() << "initial sync cloning indexes for : " << db; string err; int errCode; @@ -124,10 +87,9 @@ namespace repl { Lock::DBLock dbWrite(txn->lockState(), db, newlm::MODE_X); if (!cloner.go(txn, db, host, options, NULL, err, &errCode)) { - sethbmsg(str::stream() << "initial sync: error while " - << (dataPass ? "cloning " : "indexing ") << db - << ". " << (err.empty() ? "" : err + ". ") - << "sleeping 5 minutes" ,0); + log() << "initial sync: error while " + << (dataPass ? "cloning " : "indexing ") << db + << ". " << (err.empty() ? "" : err + ". "); return false; } } @@ -135,140 +97,6 @@ namespace repl { return true; } - static void emptyOplog(OperationContext* txn) { - Client::WriteContext ctx(txn, rsoplog); - - Collection* collection = ctx.ctx().db()->getCollection(txn, rsoplog); - - // temp - if( collection->numRecords(txn) == 0 ) - return; // already empty, ok. - - LOG(1) << "replSet empty oplog" << rsLog; - uassertStatusOK( collection->truncate(txn) ); - ctx.commit(); - } - - const Member* ReplSetImpl::getMemberToSyncTo() { - lock lk(this); - - // if we have a target we've requested to sync from, use it - - if (_forceSyncTarget) { - Member* target = _forceSyncTarget; - _forceSyncTarget = 0; - sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0); - return target; - } - - const Member* primary = box.getPrimary(); - - // wait for 2N pings before choosing a sync target - if (_cfg) { - int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; - - if (needMorePings > 0) { - OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; - return NULL; - } - - // If we are only allowed to sync from the primary, return that - if (!_cfg->chainingAllowed()) { - // Returns NULL if we cannot reach the primary - return primary; - } - } - - // find the member with the lowest ping time that has more data than me - - // Find primary's oplog time. Reject sync candidates that are more than - // maxSyncSourceLagSecs seconds behind. - OpTime primaryOpTime; - if (primary) - primaryOpTime = primary->hbinfo().opTime; - else - // choose a time that will exclude no candidates, since we don't see a primary - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); - - if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) { - // erh - I think this means there was just a new election - // and we don't yet know the new primary's optime - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); - } - - OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0); - - Member *closest = 0; - time_t now = 0; - - // Make two attempts. The first attempt, we ignore those nodes with - // slave delay higher than our own. The second attempt includes such - // nodes, in case those are the only ones we can reach. - // This loop attempts to set 'closest'. - for (int attempts = 0; attempts < 2; ++attempts) { - for (Member *m = _members.head(); m; m = m->next()) { - if (!m->syncable()) - continue; - - if (m->state() == MemberState::RS_SECONDARY) { - // only consider secondaries that are ahead of where we are - if (m->hbinfo().opTime <= lastOpTimeWritten) - continue; - // omit secondaries that are excessively behind, on the first attempt at least. - if (attempts == 0 && - m->hbinfo().opTime < oldestSyncOpTime) - continue; - } - - // omit nodes that are more latent than anything we've already considered - if (closest && - (m->hbinfo().ping > closest->hbinfo().ping)) - continue; - - if (attempts == 0 && - (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) { - continue; // skip this one in the first attempt - } - - map<string,time_t>::iterator vetoed = _veto.find(m->fullName()); - if (vetoed != _veto.end()) { - // Do some veto housekeeping - if (now == 0) { - now = time(0); - } - - // if this was on the veto list, check if it was vetoed in the last "while". - // if it was, skip. - if (vetoed->second >= now) { - if (time(0) % 5 == 0) { - log() << "replSet not trying to sync from " << (*vetoed).first - << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog; - } - continue; - } - _veto.erase(vetoed); - // fall through, this is a valid candidate now - } - // This candidate has passed all tests; set 'closest' - closest = m; - } - if (closest) break; // no need for second attempt - } - - if (!closest) { - return NULL; - } - - sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); - - return closest; - } - - void ReplSetImpl::veto(const string& host, const unsigned secs) { - lock lk(this); - _veto[host] = time(0)+secs; - } - /** * Replays the sync target's oplog from lastOp to the latest op on the sync target. * @@ -277,11 +105,10 @@ namespace repl { * @param source the sync target * @return if applying the oplog succeeded */ - bool ReplSetImpl::_initialSyncApplyOplog( OperationContext* ctx, - repl::SyncTail& syncer, - OplogReader* r, - const Member* source) { - const OpTime startOpTime = lastOpTimeWritten; + bool _initialSyncApplyOplog( OperationContext* ctx, + repl::SyncTail& syncer, + OplogReader* r) { + const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime(); BSONObj lastOp; try { // It may have been a long time since we last used this connection to @@ -291,16 +118,22 @@ namespace repl { // Solution is to increase the TCP keepalive frequency. lastOp = r->getLastOp(rsoplog); } catch ( SocketException & ) { - log() << "connection lost to " << source->h().toString() << "; is your tcp keepalive interval set appropriately?"; - if( !r->connect(source->h()) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); + HostAndPort host = r->getHost(); + log() << "connection lost to " << host.toString() << + "; is your tcp keepalive interval set appropriately?"; + if( !r->connect(host) ) { + error() << "initial sync couldn't connect to " << host.toString(); throw; } // retry lastOp = r->getLastOp(rsoplog); } - isyncassert( "lastOp is empty ", !lastOp.isEmpty() ); + if (lastOp.isEmpty()) { + error() << "initial sync lastOp is empty"; + sleepsecs(1); + return false; + } OpTime stopOpTime = lastOp["ts"]._opTime(); @@ -320,7 +153,7 @@ namespace repl { << rsLog; getGlobalReplicationCoordinator()->setMyLastOptime(ctx, OpTime()); - lastH = 0; + BackgroundSync::get()->setLastHash(0); sleepsecs(5); return false; @@ -350,42 +183,36 @@ namespace repl { * this member should have consistent data. 8 is "cosmetic," it is only to get this member * closer to the latest op time before it can transition out of startup state */ - void ReplSetImpl::_initialSync() { - InitialSync init(BackgroundSync::get()); - SyncTail tail(BackgroundSync::get(), multiSyncApply); - sethbmsg("initial sync pending",0); - - // if this is the first node, it may have already become primary - if ( box.getState().primary() ) { - sethbmsg("I'm already primary, no need for initial sync",0); - return; - } - - const Member *source = getMemberToSyncTo(); - if (!source) { - sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); - sleepsecs(15); - return; - } + void _initialSync() { + BackgroundSync* bgsync(BackgroundSync::get()); + InitialSync init(bgsync); + SyncTail tail(bgsync, multiSyncApply); + log() << "initial sync pending"; - string sourceHostname = source->h().toString(); - init.setHostname(sourceHostname); OplogReader r; - if( !r.connect(source->h()) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); - sleepsecs(15); + OpTime now(Milliseconds(curTimeMillis64()).total_seconds(), 0); + OperationContextImpl txn; + + ReplicationCoordinator* replCoord(getGlobalReplicationCoordinator()); + + // We must prime the sync source selector so that it considers all candidates regardless + // of oplog position, by passing in "now" as the last op fetched time. + r.connectToSyncSource(&txn, now, replCoord); + if (r.getHost().empty()) { + log() << "no valid sync sources found in current replset to do an initial sync"; + sleepsecs(3); return; } + init.setHostname(r.getHost().toString()); + BSONObj lastOp = r.getLastOp(rsoplog); - if( lastOp.isEmpty() ) { - sethbmsg("initial sync couldn't read remote oplog", 0); + if ( lastOp.isEmpty() ) { + log() << "initial sync couldn't read remote oplog"; sleepsecs(15); return; } - OperationContextImpl txn; - if (getGlobalReplicationCoordinator()->getSettings().fastsync) { log() << "fastsync: skipping database clone" << rsLog; @@ -394,54 +221,49 @@ namespace repl { _logOpObjRS(&txn, lastOp); return; } - else { - // Add field to minvalid document to tell us to restart initial sync if we crash - setInitialSyncFlag(&txn); - sethbmsg("initial sync drop all databases", 0); - dropAllDatabasesExceptLocal(&txn); + // Add field to minvalid document to tell us to restart initial sync if we crash + setInitialSyncFlag(&txn); - sethbmsg("initial sync clone all databases", 0); + log() << "initial sync drop all databases"; + dropAllDatabasesExceptLocal(&txn); - list<string> dbs = r.conn()->getDatabaseNames(); + log() << "initial sync clone all databases"; - Cloner cloner; - if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) { - veto(source->fullName(), 600); - sleepsecs(300); - return; - } + list<string> dbs = r.conn()->getDatabaseNames(); - sethbmsg("initial sync data copy, starting syncup",0); + Cloner cloner; + if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) { + return; + } - // prime oplog - init.syncApply(&txn, lastOp, false); - _logOpObjRS(&txn, lastOp); + log() << "initial sync data copy, starting syncup"; - log() << "oplog sync 1 of 3" << endl; - if (!_initialSyncApplyOplog(&txn, init, &r , source)) { - return; - } + // prime oplog + init.syncApply(&txn, lastOp, false); + _logOpObjRS(&txn, lastOp); - // Now we sync to the latest op on the sync target _again_, as we may have recloned ops - // that were "from the future" compared with minValid. During this second application, - // nothing should need to be recloned. - log() << "oplog sync 2 of 3" << endl; - if (!_initialSyncApplyOplog(&txn, init, &r , source)) { - return; - } - // data should now be consistent + log() << "oplog sync 1 of 3" << endl; + if (!_initialSyncApplyOplog(&txn, init, &r)) { + return; + } - sethbmsg("initial sync building indexes",0); - if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) { - veto(source->fullName(), 600); - sleepsecs(300); - return; - } + // Now we sync to the latest op on the sync target _again_, as we may have recloned ops + // that were "from the future" compared with minValid. During this second application, + // nothing should need to be recloned. + log() << "oplog sync 2 of 3" << endl; + if (!_initialSyncApplyOplog(&txn, init, &r)) { + return; + } + // data should now be consistent + + log() << "initial sync building indexes"; + if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) { + return; } log() << "oplog sync 3 of 3" << endl; - if (!_initialSyncApplyOplog(&txn, tail, &r, source)) { + if (!_initialSyncApplyOplog(&txn, tail, &r)) { return; } @@ -453,17 +275,12 @@ namespace repl { return; } - sethbmsg("initial sync finishing up",0); - - verify( !box.getState().primary() ); // wouldn't make sense if we were. + log() << "initial sync finishing up"; { Client::WriteContext cx(&txn, "local."); - - try { - log() << "replSet set minValid=" << lastOpTimeWritten << rsLog; - } - catch(...) { } + OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime()); + log() << "replSet set minValid=" << lastOpTimeWritten << rsLog; // Initial sync is now complete. Flag this by setting minValid to the last thing // we synced. @@ -471,18 +288,39 @@ namespace repl { // Clear the initial sync flag. clearInitialSyncFlag(&txn); + BackgroundSync::get()->setInitialSyncRequestedFlag(false); cx.commit(); } - { - boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex); - theReplSet->initialSyncRequested = false; - } // If we just cloned & there were no ops applied, we still want the primary to know where // we're up to - BackgroundSync::notify(); + bgsync->notify(); + + log() << "initial sync done"; + } +} // namespace + + void syncDoInitialSync() { + static const int maxFailedAttempts = 10; + + OperationContextImpl txn; + createOplog(&txn); - sethbmsg("initial sync done",0); + int failedAttempts = 0; + while ( failedAttempts < maxFailedAttempts ) { + try { + _initialSync(); + break; + } + catch(DBException& e) { + failedAttempts++; + mongoutils::str::stream msg; + error() << "initial sync exception: " << e.toString() << " " << + (maxFailedAttempts - failedAttempts) << " attempts remaining"; + sleepsecs(5); + } + } + fassert( 16233, failedAttempts < maxFailedAttempts); } } // namespace repl diff --git a/src/mongo/db/repl/rs_initialsync.h b/src/mongo/db/repl/rs_initialsync.h new file mode 100644 index 00000000000..659bb5ad577 --- /dev/null +++ b/src/mongo/db/repl/rs_initialsync.h @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { +namespace repl { + /** + * Begins an initial sync of a node. This drops all data, chooses a sync source, + * and runs the cloner from that sync source. The node's state is not changed. + */ + void syncDoInitialSync(); +} +} diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 349987bf655..bd9411a0feb 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -45,11 +45,12 @@ #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" #include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_coordinator.h" -#include "mongo/db/repl/rs.h" +#include "mongo/db/repl/repl_coordinator_impl.h" #include "mongo/db/repl/rslog.h" #include "mongo/util/log.h" @@ -695,8 +696,10 @@ namespace { warn = true; } - // reset cached lastoptimewritten and h value - theReplSet->loadLastOpTimeWritten(txn); + // Reload the lastOpTimeApplied value in the replcoord and the lastHash value in bgsync + // to reflect our new last op. + replCoord->resetLastOpTimeFromOplog(txn); + BackgroundSync::get()->loadLastHash(txn); // done if (warn) diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index d7c9218b6ec..142ecaa69fb 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -35,7 +35,6 @@ #include "third_party/murmurhash3/MurmurHash3.h" #include "mongo/base/counter.h" -#include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status.h" @@ -50,6 +49,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rs.h" +#include "mongo/db/repl/rs_initialsync.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/server_parameters.h" @@ -62,46 +62,6 @@ namespace mongo { namespace repl { - /* should be in RECOVERING state on arrival here. - */ - void ReplSetImpl::tryToGoLiveAsASecondary(OperationContext* txn) { - if (getGlobalReplicationCoordinator()->getMaintenanceMode()) { - // we're not actually going live - return; - } - - lock rsLock( this ); - - // if we're blocking sync, don't change state - if (_blockSync) { - return; - } - - // if we're fsync-and-locked, don't bother checking - if (lockedForWriting()) { - return; - } - - Lock::GlobalWrite writeLock(txn->lockState()); - - // Only state RECOVERING can transition to SECONDARY. - MemberState state(getGlobalReplicationCoordinator()->getCurrentMemberState()); - if (!state.recovering()) { - return; - } - - OpTime minvalid = getMinValid(txn); - if (minvalid > getGlobalReplicationCoordinator()->getMyLastOptime()) { - sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << - minvalid.toString()); - return; - } - - sethbmsg(""); - getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_SECONDARY); - } - - Status ReplSetImpl::forceSyncFrom(const string& host, BSONObjBuilder* result) { lock lk(this); @@ -155,9 +115,9 @@ namespace repl { } // record the previous member we were syncing from - const Member *prev = BackgroundSync::get()->getSyncTarget(); - if (prev) { - result->append("prevSyncTarget", prev->fullName()); + const HostAndPort prev = BackgroundSync::get()->getSyncTarget(); + if (!prev.empty()) { + result->append("prevSyncTarget", prev.toString()); } // finally, set the new target @@ -170,7 +130,8 @@ namespace repl { return _forceSyncTarget != 0; } - bool ReplSetImpl::shouldChangeSyncTarget(const OpTime& targetOpTime) const { + bool ReplSetImpl::shouldChangeSyncTarget(const HostAndPort& currentTarget) const { + OpTime targetOpTime = findByName(currentTarget.toString())->hbinfo().opTime; for (Member *m = _members.head(); m; m = m->next()) { if (m->syncable() && targetOpTime.getSecs()+maxSyncSourceLagSecs < m->hbinfo().opTime.getSecs()) { @@ -191,12 +152,7 @@ namespace repl { sleepsecs(1); return; } - - bool initialSyncRequested = false; - { - boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex); - initialSyncRequested = theReplSet->initialSyncRequested; - } + bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag(); // Check criteria for doing an initial sync: // 1. If the oplog is empty, do an initial sync // 2. If minValid has _initialSyncFlag set, do an initial sync @@ -215,20 +171,9 @@ namespace repl { } bool ReplSetImpl::resync(OperationContext* txn, string& errmsg) { - changeState(MemberState::RS_RECOVERING); + getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_STARTUP2); + BackgroundSync::get()->setInitialSyncRequestedFlag(true); - WriteUnitOfWork wunit(txn); - Client::Context ctx(txn, "local"); - - ctx.db()->dropCollection(txn, "local.oplog.rs"); - { - boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex); - theReplSet->initialSyncRequested = true; - } - getGlobalReplicationCoordinator()->setMyLastOptime(txn, OpTime()); - _veto.clear(); - - wunit.commit(); return true; } @@ -262,13 +207,6 @@ namespace repl { } void startSyncThread() { - static int n; - if( n != 0 ) { - log() << "replSet ERROR : more than one sync thread?" << rsLog; - verify( n == 0 ); - } - n++; - Client::initThread("rsSync"); replLocalAuth(); theReplSet->syncThread(); diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index bfa7507c18b..81084cee2c7 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/rslog.h" #include "mongo/db/operation_context.h" #include "mongo/util/log.h" +#include "mongo/util/net/hostandport.h" namespace mongo { @@ -54,8 +55,7 @@ namespace repl { // used in replAuthenticate static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); - SyncSourceFeedback::SyncSourceFeedback() : _syncTarget(NULL), - _positionChanged(false), + SyncSourceFeedback::SyncSourceFeedback() : _positionChanged(false), _handshakeNeeded(false), _shutdownSignaled(false) {} SyncSourceFeedback::~SyncSourceFeedback() {} @@ -253,18 +253,18 @@ namespace repl { _resetConnection(); continue; } - const Member* target = BackgroundSync::get()->getSyncTarget(); + const HostAndPort target = BackgroundSync::get()->getSyncTarget(); if (_syncTarget != target) { _resetConnection(); _syncTarget = target; } if (!hasConnection()) { // fix connection if need be - if (!target) { + if (target.empty()) { sleepmillis(500); continue; } - if (!_connect(&txn, target->h())) { + if (!_connect(&txn, target)) { sleepmillis(500); continue; } diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index fc5a33a8469..fc20aa20ee6 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -35,15 +35,13 @@ #include "mongo/client/constants.h" #include "mongo/client/dbclientcursor.h" +#include "mongo/util/net/hostandport.h" namespace mongo { - class OperationContext; namespace repl { - class Member; - class SyncSourceFeedback { public: SyncSourceFeedback(); @@ -100,7 +98,7 @@ namespace repl { /// TODO(spencer): Remove this once the LegacyReplicationCoordinator is gone. BSONObj _me; // the member we are currently syncing from - const Member* _syncTarget; + HostAndPort _syncTarget; // our connection to our sync target boost::scoped_ptr<DBClientConnection> _connection; // protects cond, _shutdownSignaled, and the indicator bools. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index d87d475056f..65d345d3a1b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -35,6 +35,7 @@ #include "third_party/murmurhash3/MurmurHash3.h" #include "mongo/base/counter.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/curop.h" @@ -324,12 +325,20 @@ namespace { // (always checked in the first iteration of this do-while loop, because // ops is empty) if (ops.empty() || now > lastTimeChecked) { - { - boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex); - if (theReplSet->initialSyncRequested) { - // got a resync command - return; - } + BackgroundSync* bgsync = BackgroundSync::get(); + if (bgsync->getInitialSyncRequestedFlag()) { + // got a resync command + Lock::DBWrite lk(txn.lockState(), "local"); + WriteUnitOfWork wunit(&txn); + Client::Context ctx(&txn, "local"); + + ctx.db()->dropCollection(&txn, "local.oplog.rs"); + getGlobalReplicationCoordinator()->setMyLastOptime(&txn, OpTime()); + theReplSet->_veto.clear(); + bgsync->stop(); + wunit.commit(); + + return; } lastTimeChecked = now; // can we become secondary? @@ -389,18 +398,8 @@ namespace { OpTime minValid = lastOp["ts"]._opTime(); setMinValid(&txn, minValid); - if (BackgroundSync::get()->isAssumingPrimary()) { - LOG(1) << "about to apply batch up to optime: " - << ops.getDeque().back()["ts"]._opTime().toStringPretty(); - } - multiApply(ops.getDeque()); - if (BackgroundSync::get()->isAssumingPrimary()) { - LOG(1) << "about to update oplog to optime: " - << ops.getDeque().back()["ts"]._opTime().toStringPretty(); - } - applyOpsToOplog(&ops.getDeque()); // If we're just testing (no manager), don't keep looping if we exhausted the bgqueue @@ -494,12 +493,8 @@ namespace { wunit.commit(); } - if (BackgroundSync::get()->isAssumingPrimary()) { - LOG(1) << "notifying BackgroundSync"; - } - // Update write concern on primary - BackgroundSync::notify(); + BackgroundSync::get()->notify(); return lastOpTime; } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 6729c4d14f3..ee76f5c89b8 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -39,7 +39,6 @@ namespace mongo { class OperationContext; namespace repl { - class BackgroundSyncInterface; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 0f7e5e2c076..b5a90386e1b 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -253,7 +253,7 @@ namespace { return _syncSource; } _syncSource = _currentConfig.getMemberAt(closestIndex).getHostAndPort(); - std::string msg(str::stream() << "syncing to: " << _syncSource.toString(), 0); + std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0); _sethbmsg(msg); log() << msg; return _syncSource; diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index fc1ee065097..d528a4f8605 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -2625,7 +2625,7 @@ namespace { ASSERT_EQUALS(OpTime(0,0), response.getOpTime()); ASSERT_EQUALS(Seconds(0).total_milliseconds(), response.getTime().total_milliseconds()); // changed to a syncing message because our sync source changed recently - ASSERT_EQUALS("syncing to: h2:27017", response.getHbMsg()); + ASSERT_EQUALS("syncing from: h2:27017", response.getHbMsg()); ASSERT_EQUALS("rs0", response.getReplicaSetName()); ASSERT_EQUALS(1, response.getVersion()); ASSERT_EQUALS(HostAndPort("h2").toString(), response.getSyncingTo()); |