summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authorKristina <kristina@10gen.com>2012-04-20 15:54:26 -0400
committerKristina <kristina@10gen.com>2012-05-16 17:08:46 -0400
commit5ba5abbb01583e637e7d380ab869bf4d695848af (patch)
treed801cc171c290f0e9fdd4855ed1d5dc0bd436bb3 /src/mongo/db/repl/bgsync.cpp
parentadfe6f647fcbf7737c81fa747d8bf3599cc7d84c (diff)
downloadmongo-5ba5abbb01583e637e7d380ab869bf4d695848af.tar.gz
Separate thread for fetching oplog ops SERVER-4392
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp324
1 files changed, 281 insertions, 43 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 830bfea87d2..eb45f2e02a2 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -15,6 +15,7 @@
*/
#include "mongo/pch.h"
+#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/client.h"
@@ -25,7 +26,13 @@ namespace replset {
BackgroundSyncInterface::~BackgroundSyncInterface() {}
- BackgroundSync::BackgroundSync() : _oplogMarkerTarget(NULL),
+ BackgroundSync::BackgroundSync() : _maxSize(200*1024*1024),
+ _bufSize(0),
+ _lastOpTimeFetched(0, 0),
+ _lastH(0),
+ _pause(true),
+ _currentSyncTarget(NULL),
+ _oplogMarkerTarget(NULL),
_oplogMarker(true /* doHandshake */),
_consumedOpTime(0, 0) {
}
@@ -38,6 +45,15 @@ namespace replset {
return s_instance;
}
+ void BackgroundSync::shutdown() {
+ boost::unique_lock<boost::mutex> lock(s_mutex);
+ if (s_instance == NULL) {
+ return;
+ }
+
+ s_instance->_bufCond.notify_all();
+ }
+
void BackgroundSync::notifierThread() {
Client::initThread("rsSyncNotifier");
replLocalAuth();
@@ -107,9 +123,6 @@ namespace replset {
}
bool BackgroundSync::hasCursor() {
- // temp
- Member *_currentSyncTarget = theReplSet->getSyncTarget();
-
{
// we don't need the global write lock yet, but it's needed by OplogReader::connect
// so we take it preemptively to avoid deadlocking.
@@ -142,74 +155,264 @@ namespace replset {
return _oplogMarker.haveCursor();
}
-} // namespace replset
+ void BackgroundSync::producerThread() {
+ Client::initThread("rsBackgroundSync");
+ replLocalAuth();
+
+ while (!inShutdown()) {
+ if (!theReplSet) {
+ log() << "replSet warning did not receive a valid config yet, sleeping 20 seconds " << rsLog;
+ sleepsecs(20);
+ continue;
+ }
+
+ try {
+ _producerThread();
+ }
+ catch (DBException& e) {
+ sethbmsg(str::stream() << "syncThread: " << e.toString());
+ sleepsecs(10);
+ }
+ catch (...) {
+ sethbmsg("unexpected exception in syncThread()");
+ sleepsecs(60);
+ }
+
+ sleepsecs(1);
+ }
+
+ cc().shutdown();
+ }
+
+ void BackgroundSync::_producerThread() {
+ MemberState state = theReplSet->state();
+
+ // we want to pause when the state changes to primary
+ if (state.primary()) {
+ if (!_pause) {
+ stop();
+ }
+ sleepsecs(1);
+ return;
+ }
+
+ if (state.fatal() || state.startup()) {
+ sleepsecs(5);
+ return;
+ }
+
+ // if this member has an empty oplog, we cannot start syncing
+ if (theReplSet->lastOpTimeWritten.isNull()) {
+ sleepsecs(1);
+ return;
+ }
+ // we want to unpause when we're no longer primary
+ // start() also loads _lastOpTimeFetched, which we know is set from the "if"
+ else if (_pause) {
+ start();
+ }
+
+ produce();
+ }
+
+ void BackgroundSync::produce() {
+ bool doHandshake = false;
+ OplogReader r(doHandshake);
+
+ // find a target to sync from the last op time written
+ getOplogReader(r);
+
+ // no server found
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+
+ if (_currentSyncTarget == NULL) {
+ sleepsecs(1);
+ // if there is no one to sync from
+ return;
+ }
+
+ r.tailingQueryGTE(rsoplog, _lastOpTimeFetched);
+ }
+
+ // if target cut connections between connecting and querying (for
+ // example, because it stepped down) we might not have a cursor
+ if (!r.haveCursor()) {
+ return;
+ }
+
+ uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() );
+
+ if (isRollbackRequired(r)) {
+ stop();
+ return;
+ }
+
+ while (!inShutdown()) {
+ while (!inShutdown()) {
+ // if the buffer is full, wait for items to be removed
+ if (_bufSize >= _maxSize) {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _bufCond.wait(lock);
+
+ // if we produce [peanut, ..., peanut, whale] we'll have to consume more than
+ // one peanut before we're below _maxSize, so keep waiting until we are
+ break;
+ }
+
+ if (!r.moreInCurrentBatch()) {
+ if (theReplSet->gotForceSync()) {
+ return;
+ }
+ if (theReplSet->isPrimary()) {
+ return;
+ }
- bool ReplSetImpl::_isStale(OplogReader& r, const OpTime& startTs, BSONObj& remoteOldestOp) {
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (!_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) {
+ return;
+ }
+ }
+
+ r.more();
+ }
+
+ if (!r.more())
+ break;
+
+ BSONObj o = r.nextSafe();
+
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _buffer.push(o.getOwned());
+ _bufSize += o.objsize();
+ _lastH = o["h"].numberLong();
+ _lastOpTimeFetched = o["ts"]._opTime();
+ }
+ } // end while
+
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (_pause || !_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) {
+ return;
+ }
+ }
+
+ r.tailCheck();
+ if( !r.haveCursor() ) {
+ LOG(1) << "replSet end syncTail pass" << rsLog;
+ return;
+ }
+
+ // looping back is ok because this is a tailable cursor
+ }
+ }
+
+ BSONObj* BackgroundSync::peek() {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+
+ if (_currentSyncTarget != _oplogMarkerTarget &&
+ _currentSyncTarget != NULL) {
+ _oplogMarkerTarget = NULL;
+ }
+
+ if (_buffer.empty()) {
+ // it should already be 0, but just in case
+ _bufSize = 0;
+ return NULL;
+ }
+
+ return &_buffer.front();
+ }
+
+ void BackgroundSync::consume() {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ BSONObj& front = _buffer.front();
+
+ // remove from the queue first, in case catchup goes wrong
+ _bufSize -= front.objsize();
+ _buffer.pop();
+
+ // wake up producer, if it's waiting for docs to be consumed
+ _bufCond.notify_one();
+ }
+
+ bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) {
remoteOldestOp = r.findOne(rsoplog, Query());
OpTime remoteTs = remoteOldestOp["ts"]._opTime();
- DEV log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
- else LOG(3) << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
DEV {
- log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
- log() << "replSet our state: " << state().toString() << rsLog;
+ log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
+ log() << "replSet lastOpTimeFetched: " << _lastOpTimeFetched.toStringLong() << rsLog;
}
- if( startTs >= remoteTs ) {
- return false;
+ LOG(3) << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
+
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+
+ if (_lastOpTimeFetched >= remoteTs) {
+ return false;
+ }
}
return true;
}
- Member* ReplSetImpl::_getOplogReader(OplogReader& r, const OpTime& minTS) {
- Member *target = 0, *stale = 0;
+ void BackgroundSync::getOplogReader(OplogReader& r) {
+ Member *target = NULL, *stale = NULL;
BSONObj oldest;
- verify(r.conn() == 0);
+ // then we're initial syncing and we're still waiting for this to be set
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (_lastOpTimeFetched.isNull()) {
+ _currentSyncTarget = NULL;
+ return;
+ }
+ }
+
+ verify(r.conn() == NULL);
- while ((target = getMemberToSyncTo()) != 0) {
+ while ((target = theReplSet->getMemberToSyncTo()) != NULL) {
string current = target->fullName();
- if( !r.connect(current) ) {
+ if (!r.connect(current)) {
log(2) << "replSet can't connect to " << current << " to read operations" << rsLog;
r.resetConnection();
- veto(current);
+ theReplSet->veto(current);
continue;
}
- if( !minTS.isNull() && _isStale(r, minTS, oldest) ) {
+ if (isStale(r, oldest)) {
r.resetConnection();
- veto(current, 600);
+ theReplSet->veto(current, 600);
stale = target;
continue;
}
// if we made it here, the target is up and not stale
- return target;
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _currentSyncTarget = target;
+ }
+
+ return;
}
// the only viable sync target was stale
if (stale) {
- log() << "replSet error RS102 too stale to catch up, at least from " << stale->fullName() << rsLog;
- log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
- log() << "replSet oldest at " << stale->fullName() << " : " << oldest["ts"]._opTime().toStringLong() << rsLog;
- log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
-
- // reset minvalid so that we can't become primary prematurely
- {
- Lock::DBWrite lk("local.replset.minvalid");
- Helpers::putSingleton("local.replset.minvalid", oldest);
- }
-
- sethbmsg("error RS102 too stale to catch up");
- changeState(MemberState::RS_RECOVERING);
+ theReplSet->goStale(stale, oldest);
sleepsecs(120);
}
- return 0;
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _currentSyncTarget = NULL;
+ }
}
- bool ReplSetImpl::isRollbackRequired(OplogReader& r) {
+ bool BackgroundSync::isRollbackRequired(OplogReader& r) {
string hn = r.conn()->getServerAddress();
if (!r.more()) {
@@ -221,9 +424,9 @@ namespace replset {
return true;
}
OpTime theirTS = theirLastOp["ts"]._opTime();
- if (theirTS < lastOpTimeWritten) {
+ if (theirTS < theReplSet->lastOpTimeWritten) {
log() << "replSet we are ahead of the primary, will try to roll back" << rsLog;
- syncRollback(r);
+ theReplSet->syncRollback(r);
return true;
}
/* we're not ahead? maybe our new query got fresher data. best to come back and try again */
@@ -240,20 +443,55 @@ namespace replset {
BSONObj o = r.nextSafe();
OpTime ts = o["ts"]._opTime();
long long h = o["h"].numberLong();
- if( ts != lastOpTimeWritten || h != lastH ) {
- log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << rsLog;
+ if( ts != theReplSet->lastOpTimeWritten || h != theReplSet->lastH ) {
+ log() << "replSet our last op time written: " << theReplSet->lastOpTimeWritten.toStringPretty() << rsLog;
log() << "replset source's GTE: " << ts.toStringPretty() << rsLog;
- syncRollback(r);
+ theReplSet->syncRollback(r);
return true;
}
return false;
}
- Member* ReplSetImpl::getSyncTarget() {
- lock lk(this);
+ Member* BackgroundSync::getSyncTarget() {
+ boost::unique_lock<boost::mutex> lock(_mutex);
return _currentSyncTarget;
}
+ void BackgroundSync::stop() {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+
+ int popped = 0;
+
+ _pause = true;
+ _currentSyncTarget = NULL;
+ _lastOpTimeFetched = OpTime(0,0);
+ _lastH = 0;
+
+ // get rid of pending ops
+ while (!_buffer.empty()) {
+ _buffer.pop();
+ popped++;
+ }
+ _bufSize = 0;
+ if (popped > 0) {
+ log() << "replset " << popped << " ops were not applied from buffer, this should "
+ << "cause a rollback on the former primary" << rsLog;
+ }
+ }
+
+ void BackgroundSync::start() {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _pause = false;
+ massert(16160, "going to start syncing, but buffer is not empty", _buffer.empty() && _bufSize == 0);
+
+ // reset _last fields with current data
+ _lastOpTimeFetched = theReplSet->lastOpTimeWritten;
+ _lastH = theReplSet->lastH;
+
+ LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastH << rsLog;
+ }
+
+} // namespace replset
} // namespace mongo