diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 68 |
1 files changed, 38 insertions, 30 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 6e9e7538f5a..f453664262d 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -22,6 +22,7 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/rs.h" #include "mongo/util/fail_point_service.h" #include "mongo/base/counter.h" #include "mongo/db/stats/timer_stats.h" @@ -79,7 +80,6 @@ namespace replset { _assumingPrimary(false), _currentSyncTarget(NULL), _oplogMarkerTarget(NULL), - _oplogMarker(true /* doHandshake */), _consumedOpTime(0, 0) { } @@ -122,6 +122,7 @@ namespace replset { void BackgroundSync::notifierThread() { Client::initThread("rsSyncNotifier"); replLocalAuth(); + theReplSet->syncSourceFeedback.go(); while (!inShutdown()) { bool clearTarget = false; @@ -168,37 +169,44 @@ namespace replset { } void BackgroundSync::markOplog() { - LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog; + LOG(3) << "replset markOplog: " << _consumedOpTime << " " + << theReplSet->lastOpTimeWritten << rsLog; - if (!hasCursor()) { - sleepsecs(1); - return; + if (theReplSet->syncSourceFeedback.supportsUpdater()) { + theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten); + _consumedOpTime = theReplSet->lastOpTimeWritten; } + else { + if (!hasCursor()) { + return; + } - if (!_oplogMarker.moreInCurrentBatch()) { - _oplogMarker.more(); - } + if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) { + theReplSet->syncSourceFeedback.more(); + } - if (!_oplogMarker.more()) { - _oplogMarker.tailCheck(); - sleepsecs(1); - return; - } + if (!theReplSet->syncSourceFeedback.more()) { + theReplSet->syncSourceFeedback.tailCheck(); + return; + } - // if this member has written the op at optime T, we want to nextSafe up to and including T - while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) { - BSONObj temp = _oplogMarker.nextSafe(); - _consumedOpTime = temp["ts"]._opTime(); - } + // if this member has written the op at optime T + // we want to nextSafe up to and including T + while (_consumedOpTime < theReplSet->lastOpTimeWritten + && theReplSet->syncSourceFeedback.more()) { + BSONObj temp = theReplSet->syncSourceFeedback.nextSafe(); + _consumedOpTime = temp["ts"]._opTime(); + } - // call more() to signal the sync target that we've synced T - _oplogMarker.more(); + // call more() to signal the sync target that we've synced T + theReplSet->syncSourceFeedback.more(); + } } bool BackgroundSync::hasCursor() { { // prevent writers from blocking readers during fsync - SimpleMutex::scoped_lock fsynclk(filesLockedFsync); + SimpleMutex::scoped_lock fsynclk(filesLockedFsync); // we don't need the local write lock yet, but it's needed by OplogReader::connect // so we take it preemptively to avoid deadlocking. Lock::DBWrite lk("local"); @@ -210,25 +218,23 @@ namespace replset { return false; } - log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog; + log() << "replset setting oplog notifier to " + << _currentSyncTarget->fullName() << rsLog; _oplogMarkerTarget = _currentSyncTarget; - _oplogMarker.resetConnection(); - - if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) { - LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog; + if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) { _oplogMarkerTarget = NULL; return false; } } } - - if (!_oplogMarker.haveCursor()) { + if (!theReplSet->syncSourceFeedback.haveCursor()) { BSONObj fields = BSON("ts" << 1); - _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields); + theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog, + theReplSet->lastOpTimeWritten, &fields); } - return _oplogMarker.haveCursor(); + return theReplSet->syncSourceFeedback.haveCursor(); } void BackgroundSync::producerThread() { @@ -525,6 +531,8 @@ namespace replset { _currentSyncTarget = target; } + theReplSet->syncSourceFeedback.connect(target); + return; } |