summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2013-07-15 14:30:17 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2013-07-22 10:43:40 -0400
commit27c4e7fbd2ef6eeb04dccd1bcdecdb21b00522d1 (patch)
treef129314545bfcc0a5c64fb2701baa2fc4f1628a2 /src/mongo/db/repl/bgsync.cpp
parent9bf70757db09b3a7166440c508bf798d50bd212a (diff)
downloadmongo-27c4e7fbd2ef6eeb04dccd1bcdecdb21b00522d1.tar.gz
Revert "Revert "SERVER-6071 use command on local.slaves instead of cursor""
This reverts commit 6486b4035c5ac52679eb3e1a034c925ccdd20deb.
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp68
1 files changed, 38 insertions, 30 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 6e9e7538f5a..f453664262d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -22,6 +22,7 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/rs.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/base/counter.h"
#include "mongo/db/stats/timer_stats.h"
@@ -79,7 +80,6 @@ namespace replset {
_assumingPrimary(false),
_currentSyncTarget(NULL),
_oplogMarkerTarget(NULL),
- _oplogMarker(true /* doHandshake */),
_consumedOpTime(0, 0) {
}
@@ -122,6 +122,7 @@ namespace replset {
void BackgroundSync::notifierThread() {
Client::initThread("rsSyncNotifier");
replLocalAuth();
+ theReplSet->syncSourceFeedback.go();
while (!inShutdown()) {
bool clearTarget = false;
@@ -168,37 +169,44 @@ namespace replset {
}
void BackgroundSync::markOplog() {
- LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog;
+ LOG(3) << "replset markOplog: " << _consumedOpTime << " "
+ << theReplSet->lastOpTimeWritten << rsLog;
- if (!hasCursor()) {
- sleepsecs(1);
- return;
+ if (theReplSet->syncSourceFeedback.supportsUpdater()) {
+ theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
+ _consumedOpTime = theReplSet->lastOpTimeWritten;
}
+ else {
+ if (!hasCursor()) {
+ return;
+ }
- if (!_oplogMarker.moreInCurrentBatch()) {
- _oplogMarker.more();
- }
+ if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) {
+ theReplSet->syncSourceFeedback.more();
+ }
- if (!_oplogMarker.more()) {
- _oplogMarker.tailCheck();
- sleepsecs(1);
- return;
- }
+ if (!theReplSet->syncSourceFeedback.more()) {
+ theReplSet->syncSourceFeedback.tailCheck();
+ return;
+ }
- // if this member has written the op at optime T, we want to nextSafe up to and including T
- while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) {
- BSONObj temp = _oplogMarker.nextSafe();
- _consumedOpTime = temp["ts"]._opTime();
- }
+ // if this member has written the op at optime T
+ // we want to nextSafe up to and including T
+ while (_consumedOpTime < theReplSet->lastOpTimeWritten
+ && theReplSet->syncSourceFeedback.more()) {
+ BSONObj temp = theReplSet->syncSourceFeedback.nextSafe();
+ _consumedOpTime = temp["ts"]._opTime();
+ }
- // call more() to signal the sync target that we've synced T
- _oplogMarker.more();
+ // call more() to signal the sync target that we've synced T
+ theReplSet->syncSourceFeedback.more();
+ }
}
bool BackgroundSync::hasCursor() {
{
// prevent writers from blocking readers during fsync
- SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
+ SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
// we don't need the local write lock yet, but it's needed by OplogReader::connect
// so we take it preemptively to avoid deadlocking.
Lock::DBWrite lk("local");
@@ -210,25 +218,23 @@ namespace replset {
return false;
}
- log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog;
+ log() << "replset setting oplog notifier to "
+ << _currentSyncTarget->fullName() << rsLog;
_oplogMarkerTarget = _currentSyncTarget;
- _oplogMarker.resetConnection();
-
- if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) {
- LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog;
+ if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) {
_oplogMarkerTarget = NULL;
return false;
}
}
}
-
- if (!_oplogMarker.haveCursor()) {
+ if (!theReplSet->syncSourceFeedback.haveCursor()) {
BSONObj fields = BSON("ts" << 1);
- _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields);
+ theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog,
+ theReplSet->lastOpTimeWritten, &fields);
}
- return _oplogMarker.haveCursor();
+ return theReplSet->syncSourceFeedback.haveCursor();
}
void BackgroundSync::producerThread() {
@@ -525,6 +531,8 @@ namespace replset {
_currentSyncTarget = target;
}
+ theReplSet->syncSourceFeedback.connect(target);
+
return;
}