diff options
author | Eric Milkie <milkie@10gen.com> | 2013-06-18 10:29:18 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2013-08-02 11:30:07 -0400 |
commit | b89e706faca937c07cc578c37c962532fc7b751b (patch) | |
tree | 71d2420230d095f39a5d4fa7520faef348a4e974 | |
parent | 49109a06220f2497488e6b6f67cd740c44a856dc (diff) | |
download | mongo-b89e706faca937c07cc578c37c962532fc7b751b.tar.gz |
SERVER-9934 remove double getmore when pulling new ops
This reduces the potential delay in ending the bgsync thread from 10 seconds
to 5 seconds. This is important because assumingPrimaryness waits on this loop
before changing state to PRIMARY after an election.
To reduce the delay lower than 5 seconds requires further redesign of the code.
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 113 |
1 files changed, 61 insertions, 52 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 49218c006a5..f2c22151636 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -241,11 +241,10 @@ namespace replset { try { _producerThread(); } - catch (DBException& e) { - sethbmsg(str::stream() << "db exception in producer: " << e.toString()); - sleepsecs(10); + catch (const DBException& e) { + sethbmsg(str::stream() << "sync source problem: " << e.toString()); } - catch (std::exception& e2) { + catch (const std::exception& e2) { sethbmsg(str::stream() << "exception in producer: " << e2.what()); sleepsecs(60); } @@ -322,72 +321,82 @@ namespace replset { } while (!inShutdown()) { - while (!inShutdown()) { - if (!r.moreInCurrentBatch()) { - if (theReplSet->gotForceSync()) { - return; - } - - if (isAssumingPrimary() || theReplSet->isPrimary()) { - return; - } - - // re-evaluate quality of sync target - if (shouldChangeSyncTarget()) { - return; - } - //record time for each getmore - { - TimerHolder batchTimer(&getmoreReplStats); - r.more(); - } - //increment - networkByteStats.increment(r.currentBatchMessageSize()); + if (!r.moreInCurrentBatch()) { + // Check some things periodically + // (whenever we run out of items in the + // current cursor batch) + if (theReplSet->gotForceSync()) { + return; + } + // If we are transitioning to primary state, we need to leave + // this loop in order to go into bgsync-pause mode. + if (isAssumingPrimary() || theReplSet->isPrimary()) { + return; } - if (!r.more()) - break; + // re-evaluate quality of sync target + if (shouldChangeSyncTarget()) { + return; + } - BSONObj o = r.nextSafe().getOwned(); - opsReadStats.increment(); { - boost::unique_lock<boost::mutex> lock(_mutex); - _appliedBuffer = false; + //record time for each getmore + TimerHolder batchTimer(&getmoreReplStats); + + // This calls receiveMore() on the oplogreader cursor. + // It can wait up to five seconds for more data. + r.more(); } + networkByteStats.increment(r.currentBatchMessageSize()); - OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog; - } - // the blocking queue will wait (forever) until there's room for us to push - _buffer.push(o); - bufferCountGauge.increment(); - bufferSizeGauge.increment(getSize(o)); + if (!r.moreInCurrentBatch()) { + // If there is still no data from upstream, check a few more things + // and then loop back for another pass at getting more data + { + boost::unique_lock<boost::mutex> lock(_mutex); + if (_pause || + !_currentSyncTarget || + !_currentSyncTarget->hbinfo().hbstate.readable()) { + return; + } + } - { - boost::unique_lock<boost::mutex> lock(_mutex); - _lastH = o["h"].numberLong(); - _lastOpTimeFetched = o["ts"]._opTime(); + r.tailCheck(); + if( !r.haveCursor() ) { + LOG(1) << "replSet end syncTail pass" << rsLog; + return; + } + + continue; } - } // end while + } + + // At this point, we are guaranteed to have at least one thing to read out + // of the oplogreader cursor. + BSONObj o = r.nextSafe().getOwned(); + opsReadStats.increment(); { boost::unique_lock<boost::mutex> lock(_mutex); - if (_pause || !_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) { - return; - } + _appliedBuffer = false; } - - r.tailCheck(); - if( !r.haveCursor() ) { - LOG(1) << "replSet end syncTail pass" << rsLog; - return; + OCCASIONALLY { + LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog; } + // the blocking queue will wait (forever) until there's room for us to push + _buffer.push(o); + bufferCountGauge.increment(); + bufferSizeGauge.increment(getSize(o)); - // looping back is ok because this is a tailable cursor + { + boost::unique_lock<boost::mutex> lock(_mutex); + _lastH = o["h"].numberLong(); + _lastOpTimeFetched = o["ts"]._opTime(); + } } } |