summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2013-06-18 10:29:18 -0400
committerEric Milkie <milkie@10gen.com>2013-08-02 11:30:07 -0400
commitb89e706faca937c07cc578c37c962532fc7b751b (patch)
tree71d2420230d095f39a5d4fa7520faef348a4e974
parent49109a06220f2497488e6b6f67cd740c44a856dc (diff)
downloadmongo-b89e706faca937c07cc578c37c962532fc7b751b.tar.gz
SERVER-9934 remove double getmore when pulling new ops
This reduces the potential delay in ending the bgsync thread from 10 seconds to 5 seconds. This is important because assumingPrimaryness waits on this loop before changing state to PRIMARY after an election. To reduce the delay lower than 5 seconds requires further redesign of the code.
-rw-r--r--src/mongo/db/repl/bgsync.cpp113
1 files changed, 61 insertions, 52 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 49218c006a5..f2c22151636 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -241,11 +241,10 @@ namespace replset {
try {
_producerThread();
}
- catch (DBException& e) {
- sethbmsg(str::stream() << "db exception in producer: " << e.toString());
- sleepsecs(10);
+ catch (const DBException& e) {
+ sethbmsg(str::stream() << "sync source problem: " << e.toString());
}
- catch (std::exception& e2) {
+ catch (const std::exception& e2) {
sethbmsg(str::stream() << "exception in producer: " << e2.what());
sleepsecs(60);
}
@@ -322,72 +321,82 @@ namespace replset {
}
while (!inShutdown()) {
- while (!inShutdown()) {
- if (!r.moreInCurrentBatch()) {
- if (theReplSet->gotForceSync()) {
- return;
- }
-
- if (isAssumingPrimary() || theReplSet->isPrimary()) {
- return;
- }
-
- // re-evaluate quality of sync target
- if (shouldChangeSyncTarget()) {
- return;
- }
- //record time for each getmore
- {
- TimerHolder batchTimer(&getmoreReplStats);
- r.more();
- }
- //increment
- networkByteStats.increment(r.currentBatchMessageSize());
+ if (!r.moreInCurrentBatch()) {
+ // Check some things periodically
+ // (whenever we run out of items in the
+ // current cursor batch)
+ if (theReplSet->gotForceSync()) {
+ return;
+ }
+ // If we are transitioning to primary state, we need to leave
+ // this loop in order to go into bgsync-pause mode.
+ if (isAssumingPrimary() || theReplSet->isPrimary()) {
+ return;
}
- if (!r.more())
- break;
+ // re-evaluate quality of sync target
+ if (shouldChangeSyncTarget()) {
+ return;
+ }
- BSONObj o = r.nextSafe().getOwned();
- opsReadStats.increment();
{
- boost::unique_lock<boost::mutex> lock(_mutex);
- _appliedBuffer = false;
+ //record time for each getmore
+ TimerHolder batchTimer(&getmoreReplStats);
+
+ // This calls receiveMore() on the oplogreader cursor.
+ // It can wait up to five seconds for more data.
+ r.more();
}
+ networkByteStats.increment(r.currentBatchMessageSize());
- OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog;
- }
- // the blocking queue will wait (forever) until there's room for us to push
- _buffer.push(o);
- bufferCountGauge.increment();
- bufferSizeGauge.increment(getSize(o));
+ if (!r.moreInCurrentBatch()) {
+ // If there is still no data from upstream, check a few more things
+ // and then loop back for another pass at getting more data
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (_pause ||
+ !_currentSyncTarget ||
+ !_currentSyncTarget->hbinfo().hbstate.readable()) {
+ return;
+ }
+ }
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- _lastH = o["h"].numberLong();
- _lastOpTimeFetched = o["ts"]._opTime();
+ r.tailCheck();
+ if( !r.haveCursor() ) {
+ LOG(1) << "replSet end syncTail pass" << rsLog;
+ return;
+ }
+
+ continue;
}
- } // end while
+ }
+
+ // At this point, we are guaranteed to have at least one thing to read out
+ // of the oplogreader cursor.
+ BSONObj o = r.nextSafe().getOwned();
+ opsReadStats.increment();
{
boost::unique_lock<boost::mutex> lock(_mutex);
- if (_pause || !_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) {
- return;
- }
+ _appliedBuffer = false;
}
-
- r.tailCheck();
- if( !r.haveCursor() ) {
- LOG(1) << "replSet end syncTail pass" << rsLog;
- return;
+ OCCASIONALLY {
+ LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog;
}
+ // the blocking queue will wait (forever) until there's room for us to push
+ _buffer.push(o);
+ bufferCountGauge.increment();
+ bufferSizeGauge.increment(getSize(o));
- // looping back is ok because this is a tailable cursor
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _lastH = o["h"].numberLong();
+ _lastOpTimeFetched = o["ts"]._opTime();
+ }
}
}