diff options
author | Kristina <kristina@10gen.com> | 2012-04-20 15:54:26 -0400 |
---|---|---|
committer | Kristina <kristina@10gen.com> | 2012-05-16 17:08:46 -0400 |
commit | 5ba5abbb01583e637e7d380ab869bf4d695848af (patch) | |
tree | d801cc171c290f0e9fdd4855ed1d5dc0bd436bb3 /src/mongo/db/repl/bgsync.cpp | |
parent | adfe6f647fcbf7737c81fa747d8bf3599cc7d84c (diff) | |
download | mongo-5ba5abbb01583e637e7d380ab869bf4d695848af.tar.gz |
Separate thread for fetching oplog ops SERVER-4392
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 324 |
1 files changed, 281 insertions, 43 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 830bfea87d2..eb45f2e02a2 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -15,6 +15,7 @@ */ #include "mongo/pch.h" +#include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/client.h" @@ -25,7 +26,13 @@ namespace replset { BackgroundSyncInterface::~BackgroundSyncInterface() {} - BackgroundSync::BackgroundSync() : _oplogMarkerTarget(NULL), + BackgroundSync::BackgroundSync() : _maxSize(200*1024*1024), + _bufSize(0), + _lastOpTimeFetched(0, 0), + _lastH(0), + _pause(true), + _currentSyncTarget(NULL), + _oplogMarkerTarget(NULL), _oplogMarker(true /* doHandshake */), _consumedOpTime(0, 0) { } @@ -38,6 +45,15 @@ namespace replset { return s_instance; } + void BackgroundSync::shutdown() { + boost::unique_lock<boost::mutex> lock(s_mutex); + if (s_instance == NULL) { + return; + } + + s_instance->_bufCond.notify_all(); + } + void BackgroundSync::notifierThread() { Client::initThread("rsSyncNotifier"); replLocalAuth(); @@ -107,9 +123,6 @@ namespace replset { } bool BackgroundSync::hasCursor() { - // temp - Member *_currentSyncTarget = theReplSet->getSyncTarget(); - { // we don't need the global write lock yet, but it's needed by OplogReader::connect // so we take it preemptively to avoid deadlocking. @@ -142,74 +155,264 @@ namespace replset { return _oplogMarker.haveCursor(); } -} // namespace replset + void BackgroundSync::producerThread() { + Client::initThread("rsBackgroundSync"); + replLocalAuth(); + + while (!inShutdown()) { + if (!theReplSet) { + log() << "replSet warning did not receive a valid config yet, sleeping 20 seconds " << rsLog; + sleepsecs(20); + continue; + } + + try { + _producerThread(); + } + catch (DBException& e) { + sethbmsg(str::stream() << "syncThread: " << e.toString()); + sleepsecs(10); + } + catch (...) { + sethbmsg("unexpected exception in syncThread()"); + sleepsecs(60); + } + + sleepsecs(1); + } + + cc().shutdown(); + } + + void BackgroundSync::_producerThread() { + MemberState state = theReplSet->state(); + + // we want to pause when the state changes to primary + if (state.primary()) { + if (!_pause) { + stop(); + } + sleepsecs(1); + return; + } + + if (state.fatal() || state.startup()) { + sleepsecs(5); + return; + } + + // if this member has an empty oplog, we cannot start syncing + if (theReplSet->lastOpTimeWritten.isNull()) { + sleepsecs(1); + return; + } + // we want to unpause when we're no longer primary + // start() also loads _lastOpTimeFetched, which we know is set from the "if" + else if (_pause) { + start(); + } + + produce(); + } + + void BackgroundSync::produce() { + bool doHandshake = false; + OplogReader r(doHandshake); + + // find a target to sync from the last op time written + getOplogReader(r); + + // no server found + { + boost::unique_lock<boost::mutex> lock(_mutex); + + if (_currentSyncTarget == NULL) { + sleepsecs(1); + // if there is no one to sync from + return; + } + + r.tailingQueryGTE(rsoplog, _lastOpTimeFetched); + } + + // if target cut connections between connecting and querying (for + // example, because it stepped down) we might not have a cursor + if (!r.haveCursor()) { + return; + } + + uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() ); + + if (isRollbackRequired(r)) { + stop(); + return; + } + + while (!inShutdown()) { + while (!inShutdown()) { + // if the buffer is full, wait for items to be removed + if (_bufSize >= _maxSize) { + boost::unique_lock<boost::mutex> lock(_mutex); + _bufCond.wait(lock); + + // if we produce [peanut, ..., peanut, whale] we'll have to consume more than + // one peanut before we're below _maxSize, so keep waiting until we are + break; + } + + if (!r.moreInCurrentBatch()) { + if (theReplSet->gotForceSync()) { + return; + } + if (theReplSet->isPrimary()) { + return; + } - bool ReplSetImpl::_isStale(OplogReader& r, const OpTime& startTs, BSONObj& remoteOldestOp) { + { + boost::unique_lock<boost::mutex> lock(_mutex); + if (!_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) { + return; + } + } + + r.more(); + } + + if (!r.more()) + break; + + BSONObj o = r.nextSafe(); + + { + boost::unique_lock<boost::mutex> lock(_mutex); + _buffer.push(o.getOwned()); + _bufSize += o.objsize(); + _lastH = o["h"].numberLong(); + _lastOpTimeFetched = o["ts"]._opTime(); + } + } // end while + + { + boost::unique_lock<boost::mutex> lock(_mutex); + if (_pause || !_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) { + return; + } + } + + r.tailCheck(); + if( !r.haveCursor() ) { + LOG(1) << "replSet end syncTail pass" << rsLog; + return; + } + + // looping back is ok because this is a tailable cursor + } + } + + BSONObj* BackgroundSync::peek() { + boost::unique_lock<boost::mutex> lock(_mutex); + + if (_currentSyncTarget != _oplogMarkerTarget && + _currentSyncTarget != NULL) { + _oplogMarkerTarget = NULL; + } + + if (_buffer.empty()) { + // it should already be 0, but just in case + _bufSize = 0; + return NULL; + } + + return &_buffer.front(); + } + + void BackgroundSync::consume() { + boost::unique_lock<boost::mutex> lock(_mutex); + BSONObj& front = _buffer.front(); + + // remove from the queue first, in case catchup goes wrong + _bufSize -= front.objsize(); + _buffer.pop(); + + // wake up producer, if it's waiting for docs to be consumed + _bufCond.notify_one(); + } + + bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) { remoteOldestOp = r.findOne(rsoplog, Query()); OpTime remoteTs = remoteOldestOp["ts"]._opTime(); - DEV log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; - else LOG(3) << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; DEV { - log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet our state: " << state().toString() << rsLog; + log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; + log() << "replSet lastOpTimeFetched: " << _lastOpTimeFetched.toStringLong() << rsLog; } - if( startTs >= remoteTs ) { - return false; + LOG(3) << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog; + + { + boost::unique_lock<boost::mutex> lock(_mutex); + + if (_lastOpTimeFetched >= remoteTs) { + return false; + } } return true; } - Member* ReplSetImpl::_getOplogReader(OplogReader& r, const OpTime& minTS) { - Member *target = 0, *stale = 0; + void BackgroundSync::getOplogReader(OplogReader& r) { + Member *target = NULL, *stale = NULL; BSONObj oldest; - verify(r.conn() == 0); + // then we're initial syncing and we're still waiting for this to be set + { + boost::unique_lock<boost::mutex> lock(_mutex); + if (_lastOpTimeFetched.isNull()) { + _currentSyncTarget = NULL; + return; + } + } + + verify(r.conn() == NULL); - while ((target = getMemberToSyncTo()) != 0) { + while ((target = theReplSet->getMemberToSyncTo()) != NULL) { string current = target->fullName(); - if( !r.connect(current) ) { + if (!r.connect(current)) { log(2) << "replSet can't connect to " << current << " to read operations" << rsLog; r.resetConnection(); - veto(current); + theReplSet->veto(current); continue; } - if( !minTS.isNull() && _isStale(r, minTS, oldest) ) { + if (isStale(r, oldest)) { r.resetConnection(); - veto(current, 600); + theReplSet->veto(current, 600); stale = target; continue; } // if we made it here, the target is up and not stale - return target; + { + boost::unique_lock<boost::mutex> lock(_mutex); + _currentSyncTarget = target; + } + + return; } // the only viable sync target was stale if (stale) { - log() << "replSet error RS102 too stale to catch up, at least from " << stale->fullName() << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet oldest at " << stale->fullName() << " : " << oldest["ts"]._opTime().toStringLong() << rsLog; - log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; - - // reset minvalid so that we can't become primary prematurely - { - Lock::DBWrite lk("local.replset.minvalid"); - Helpers::putSingleton("local.replset.minvalid", oldest); - } - - sethbmsg("error RS102 too stale to catch up"); - changeState(MemberState::RS_RECOVERING); + theReplSet->goStale(stale, oldest); sleepsecs(120); } - return 0; + { + boost::unique_lock<boost::mutex> lock(_mutex); + _currentSyncTarget = NULL; + } } - bool ReplSetImpl::isRollbackRequired(OplogReader& r) { + bool BackgroundSync::isRollbackRequired(OplogReader& r) { string hn = r.conn()->getServerAddress(); if (!r.more()) { @@ -221,9 +424,9 @@ namespace replset { return true; } OpTime theirTS = theirLastOp["ts"]._opTime(); - if (theirTS < lastOpTimeWritten) { + if (theirTS < theReplSet->lastOpTimeWritten) { log() << "replSet we are ahead of the primary, will try to roll back" << rsLog; - syncRollback(r); + theReplSet->syncRollback(r); return true; } /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ @@ -240,20 +443,55 @@ namespace replset { BSONObj o = r.nextSafe(); OpTime ts = o["ts"]._opTime(); long long h = o["h"].numberLong(); - if( ts != lastOpTimeWritten || h != lastH ) { - log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << rsLog; + if( ts != theReplSet->lastOpTimeWritten || h != theReplSet->lastH ) { + log() << "replSet our last op time written: " << theReplSet->lastOpTimeWritten.toStringPretty() << rsLog; log() << "replset source's GTE: " << ts.toStringPretty() << rsLog; - syncRollback(r); + theReplSet->syncRollback(r); return true; } return false; } - Member* ReplSetImpl::getSyncTarget() { - lock lk(this); + Member* BackgroundSync::getSyncTarget() { + boost::unique_lock<boost::mutex> lock(_mutex); return _currentSyncTarget; } + void BackgroundSync::stop() { + boost::unique_lock<boost::mutex> lock(_mutex); + + int popped = 0; + + _pause = true; + _currentSyncTarget = NULL; + _lastOpTimeFetched = OpTime(0,0); + _lastH = 0; + + // get rid of pending ops + while (!_buffer.empty()) { + _buffer.pop(); + popped++; + } + _bufSize = 0; + if (popped > 0) { + log() << "replset " << popped << " ops were not applied from buffer, this should " + << "cause a rollback on the former primary" << rsLog; + } + } + + void BackgroundSync::start() { + boost::unique_lock<boost::mutex> lock(_mutex); + _pause = false; + massert(16160, "going to start syncing, but buffer is not empty", _buffer.empty() && _bufSize == 0); + + // reset _last fields with current data + _lastOpTimeFetched = theReplSet->lastOpTimeWritten; + _lastH = theReplSet->lastH; + + LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastH << rsLog; + } + +} // namespace replset } // namespace mongo |