summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2014-09-03 11:40:47 -0400
committerScott Hernandez <scotthernandez@gmail.com>2014-09-04 12:09:21 -0400
commitdbf18450b53d4ee1df99c701120567219608adc0 (patch)
treebf8c0a4a0961a8ca27197bedf70ed396486653ec
parent8f8e59a78107a079c5367abdc31580d20428d9cc (diff)
downloadmongo-dbf18450b53d4ee1df99c701120567219608adc0.tar.gz
SERVER-14805: initialsync multithreaded oplog apply
-rw-r--r--jstests/replsets/resync_with_write_load.js82
-rw-r--r--src/mongo/db/repl/rs.cpp11
-rw-r--r--src/mongo/db/repl/rs.h12
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp115
-rw-r--r--src/mongo/db/repl/rs_sync.cpp152
-rw-r--r--src/mongo/db/repl/rs_sync.h57
-rw-r--r--src/mongo/shell/replsettest.js83
7 files changed, 325 insertions, 187 deletions
diff --git a/jstests/replsets/resync_with_write_load.js b/jstests/replsets/resync_with_write_load.js
new file mode 100644
index 00000000000..aaf53c91dd8
--- /dev/null
+++ b/jstests/replsets/resync_with_write_load.js
@@ -0,0 +1,82 @@
+/**
+ * This test creates a 2 node replica set and then puts load on the primary with writes during
+ * the resync in order to verify that all phases of the initial sync work correctly.
+ *
+ * We cannot test each phase of the initial sync directly but by providing constant writes we can
+ * assume that each individual phase will have data to work with, and therefore tested.
+ */
+var replTest = new ReplSetTest({name: 'resync', nodes: 2, oplogSize: 100});
+var nodes = replTest.nodeList();
+
+var conns = replTest.startSet();
+var config = { "_id": "resync",
+ "members": [
+ {"_id": 0, "host": nodes[0], priority:4},
+ {"_id": 1, "host": nodes[1]}]
+ };
+var r = replTest.initiate(config);
+
+// Make sure we have a master
+var master = replTest.getMaster();
+var a_conn = conns[0];
+var b_conn = conns[1];
+a_conn.setSlaveOk();
+b_conn.setSlaveOk();
+var A = a_conn.getDB("test");
+var B = b_conn.getDB("test");
+var AID = replTest.getNodeId(a_conn);
+var BID = replTest.getNodeId(b_conn);
+assert(master == conns[0], "conns[0] assumed to be master");
+assert(a_conn.host == master.host);
+
+// create an oplog entry with an insert
+assert.writeOK( A.foo.insert({ x: 1 }, { writeConcern: { w: 1, wtimeout: 60000 }}));
+replTest.stop(BID);
+
+print("******************** starting load for 30 secs *********************");
+var work = 'var start=new Date().getTime(); db.timeToStartTrigger.insert({_id:1}); while(true) {for(x=0;x<1000;x++) {db["a" + x].insert({a:x})};sleep(1); if((new Date().getTime() - start) > 30000) break; }';
+
+//insert enough that resync node has to go through oplog replay in each step
+var loadGen = startParallelShell( work, replTest.ports[0] );
+
+// wait for document to appear to continue
+assert.soon(function() {
+ try {
+ return 1 == a_conn.getDB("test")["timeToStartTrigger"].count();
+ } catch ( e ) {
+ print( e );
+ return false;
+ }
+}, "waited too long for start trigger");
+
+print("*************** STARTING node without data ***************");
+replTest.start(BID);
+// check that it is up
+assert.soon(function() {
+ try {
+ var result = b_conn.getDB("admin").runCommand({replSetGetStatus: 1});
+ return true;
+ } catch ( e ) {
+ print( e );
+ return false;
+ }
+}, "node didn't come up");
+
+print("waiting for load generation to finish");
+loadGen();
+
+// load must stop before we await replication.
+replTest.awaitReplication();
+
+// Make sure oplogs match
+try {
+ replTest.ensureOplogsMatch();
+} catch (e) {
+ var aDBHash = A.runCommand("dbhash");
+ var bDBHash = B.runCommand("dbhash");
+ assert.eq(aDBHash.md5, bDBHash.md5,
+ "hashes differ: " + tojson(aDBHash) + " to " + tojson(bDBHash));
+}
+replTest.stopSet();
+
+print("*****test done******");
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
index 0e733b7f489..efcb546f2d0 100644
--- a/src/mongo/db/repl/rs.cpp
+++ b/src/mongo/db/repl/rs.cpp
@@ -1010,14 +1010,13 @@ namespace {
}
return false;
}
+ void ReplSetImpl::setMinValid(OpTime ts) {
+ Lock::DBWrite lk( "local" );
+ Helpers::putSingleton("local.replset.minvalid", BSON("$set" << BSON("ts" << ts)));
+ }
void ReplSetImpl::setMinValid(BSONObj obj) {
- BSONObjBuilder builder;
- BSONObjBuilder subobj(builder.subobjStart("$set"));
- subobj.appendTimestamp("ts", obj["ts"].date());
- subobj.done();
- Lock::DBWrite lk( "local" );
- Helpers::putSingleton("local.replset.minvalid", builder.obj());
+ setMinValid(obj["ts"]._opTime());
}
OpTime ReplSetImpl::getMinValid() {
diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h
index 46bed5b53a1..e9ccbbadc48 100644
--- a/src/mongo/db/repl/rs.h
+++ b/src/mongo/db/repl/rs.h
@@ -569,12 +569,11 @@ namespace mongo {
friend class Consensus;
private:
- bool _syncDoInitialSync_clone(Cloner &cloner, const char *master,
- const list<string>& dbs, bool dataPass);
- bool _syncDoInitialSync_applyToHead( replset::SyncTail& syncer, OplogReader* r ,
- const Member* source, const BSONObj& lastOp,
- BSONObj& minValidOut);
- void _syncDoInitialSync();
+ bool _initialSyncClone(Cloner &cloner, const std::string& master,
+ const list<string>& dbs, bool dataPass);
+ bool _initialSyncApplyOplog(replset::SyncTail& syncer, OplogReader* r ,
+ const Member* source);
+ void _initialSync();
void syncDoInitialSync();
void _syncThread();
void syncTail();
@@ -626,6 +625,7 @@ namespace mongo {
* applied.
*/
static void setMinValid(BSONObj obj);
+ static void setMinValid(OpTime ts);
static OpTime getMinValid();
static void clearInitialSyncFlag();
static bool getInitialSyncFlag();
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 98369fe801c..dcede45b7db 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -66,7 +66,7 @@ namespace mongo {
int failedAttempts = 0;
while ( failedAttempts < maxFailedAttempts ) {
try {
- _syncDoInitialSync();
+ _initialSync();
break;
}
catch(DBException& e) {
@@ -81,8 +81,8 @@ namespace mongo {
fassert( 16233, failedAttempts < maxFailedAttempts);
}
- bool ReplSetImpl::_syncDoInitialSync_clone(Cloner& cloner, const char *master,
- const list<string>& dbs, bool dataPass) {
+ bool ReplSetImpl::_initialSyncClone(Cloner& cloner, const std::string& master,
+ const list<string>& dbs, bool dataPass) {
for( list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++ ) {
string db = *i;
@@ -109,7 +109,7 @@ namespace mongo {
options.syncData = dataPass;
options.syncIndexes = ! dataPass;
- if (!cloner.go(ctx.ctx(), master, options, NULL, err, &errCode)) {
+ if (!cloner.go(ctx.ctx(), master.c_str(), options, NULL, err, &errCode)) {
sethbmsg(str::stream() << "initial sync: error while "
<< (dataPass ? "cloning " : "indexing ") << db
<< ". " << (err.empty() ? "" : err + ". ")
@@ -266,26 +266,20 @@ namespace mongo {
* @param syncer either initial sync (can reclone missing docs) or "normal" sync (no recloning)
* @param r the oplog reader
* @param source the sync target
- * @param lastOp the op to start syncing at. replset::InitialSync writes this and then moves to
- * the queue. replset::SyncTail does not write this, it moves directly to the
- * queue.
- * @param minValid populated by this function. The most recent op on the sync target's oplog,
- * this function syncs to this value (inclusive)
* @return if applying the oplog succeeded
*/
- bool ReplSetImpl::_syncDoInitialSync_applyToHead( replset::SyncTail& syncer, OplogReader* r,
- const Member* source, const BSONObj& lastOp ,
- BSONObj& minValid ) {
- /* our cloned copy will be strange until we apply oplog events that occurred
- through the process. we note that time point here. */
-
+ bool ReplSetImpl::_initialSyncApplyOplog( replset::SyncTail& syncer,
+ OplogReader* r,
+ const Member* source) {
+ const OpTime startOpTime = lastOpTimeWritten;
+ BSONObj lastOp;
try {
// It may have been a long time since we last used this connection to
// query the oplog, depending on the size of the databases we needed to clone.
// A common problem is that TCP keepalives are set too infrequent, and thus
// our connection here is terminated by a firewall due to inactivity.
// Solution is to increase the TCP keepalive frequency.
- minValid = r->getLastOp(rsoplog);
+ lastOp = r->getLastOp(rsoplog);
} catch ( SocketException & ) {
log() << "connection lost to " << source->h().toString() << "; is your tcp keepalive interval set appropriately?";
if( !r->connect(source->h().toString()) ) {
@@ -293,42 +287,43 @@ namespace mongo {
throw;
}
// retry
- minValid = r->getLastOp(rsoplog);
+ lastOp = r->getLastOp(rsoplog);
}
- isyncassert( "getLastOp is empty ", !minValid.isEmpty() );
-
- OpTime mvoptime = minValid["ts"]._opTime();
- verify( !mvoptime.isNull() );
+ isyncassert( "lastOp is empty ", !lastOp.isEmpty() );
- OpTime startingTS = lastOp["ts"]._opTime();
- verify( mvoptime >= startingTS );
+ OpTime stopOpTime = lastOp["ts"]._opTime();
- // apply startingTS..mvoptime portion of the oplog
- {
- try {
- minValid = syncer.oplogApplication(lastOp, minValid);
- }
- catch (const DBException&) {
- log() << "replSet initial sync failed during oplog application phase" << rsLog;
+ // If we already have what we need then return.
+ if (stopOpTime == startOpTime)
+ return true;
- emptyOplog(); // otherwise we'll be up!
+ verify( !stopOpTime.isNull() );
+ verify( stopOpTime > startOpTime );
- lastOpTimeWritten = OpTime();
- lastH = 0;
+ // apply till lastOpTime
+ try {
+ syncer.oplogApplication(stopOpTime);
+ }
+ catch (const DBException&) {
+ log() << "replSet initial sync failed during oplog application phase" << rsLog;
- log() << "replSet cleaning up [1]" << rsLog;
- {
- Client::WriteContext cx( "local." );
- cx.ctx().db()->flushFiles(true);
- }
- log() << "replSet cleaning up [2]" << rsLog;
+ emptyOplog(); // otherwise we'll be up!
- log() << "replSet initial sync failed will try again" << endl;
+ lastOpTimeWritten = OpTime();
+ lastH = 0;
- sleepsecs(5);
- return false;
+ log() << "replSet cleaning up [1]" << rsLog;
+ {
+ Client::WriteContext cx( "local." );
+ cx.ctx().db()->flushFiles(true);
}
+ log() << "replSet cleaning up [2]" << rsLog;
+
+ log() << "replSet initial sync failed will try again" << endl;
+
+ sleepsecs(5);
+ return false;
}
return true;
@@ -337,7 +332,7 @@ namespace mongo {
/**
* Do the initial sync for this member. There are several steps to this process:
*
- * 0. Add _initialSyncFlag to minValid to tell us to restart initial sync if we
+ * 0. Add _initialSyncFlag to minValid collection to tell us to restart initial sync if we
* crash in the middle of this procedure
* 1. Record start time.
* 2. Clone.
@@ -348,14 +343,14 @@ namespace mongo {
* 7. Build indexes.
* 8. Set minValid3 to sync target's latest op time.
* 9. Apply ops from minValid2 to minValid3.
- 10. Clean up minValid and remove _initialSyncFlag field
+ 10. Cleanup minValid collection: remove _initialSyncFlag field, set ts to minValid3 OpTime
*
* At that point, initial sync is finished. Note that the oplog from the sync target is applied
* three times: step 4, 6, and 8. 4 may involve refetching, 6 should not. By the end of 6,
* this member should have consistent data. 8 is "cosmetic," it is only to get this member
- * closer to the latest op time before it can transition to secondary state.
+ * closer to the latest op time before it can transition out of startup state
*/
- void ReplSetImpl::_syncDoInitialSync() {
+ void ReplSetImpl::_initialSync() {
replset::InitialSync init(replset::BackgroundSync::get());
replset::SyncTail tail(replset::BackgroundSync::get());
sethbmsg("initial sync pending",0);
@@ -389,14 +384,12 @@ namespace mongo {
return;
}
- // written by applyToHead calls
- BSONObj minValid;
-
if (replSettings.fastsync) {
log() << "fastsync: skipping database clone" << rsLog;
// prime oplog
- init.oplogApplication(lastOp, lastOp);
+ init.syncApply(lastOp, false);
+ _logOpObjRS(lastOp);
return;
}
else {
@@ -411,7 +404,7 @@ namespace mongo {
list<string> dbs = r.conn()->getDatabaseNames();
Cloner cloner;
- if (!_syncDoInitialSync_clone(cloner, sourceHostname.c_str(), dbs, true)) {
+ if (!_initialSyncClone(cloner, r.conn()->getServerAddress(), dbs, true)) {
veto(source->fullName(), 600);
sleepsecs(300);
return;
@@ -419,26 +412,26 @@ namespace mongo {
sethbmsg("initial sync data copy, starting syncup",0);
+ // prime oplog
+ init.syncApply(lastOp, false);
+ _logOpObjRS(lastOp);
+
log() << "oplog sync 1 of 3" << endl;
- if ( ! _syncDoInitialSync_applyToHead( init, &r , source , lastOp , minValid ) ) {
+ if (!_initialSyncApplyOplog( init, &r , source)) {
return;
}
- lastOp = minValid;
-
// Now we sync to the latest op on the sync target _again_, as we may have recloned ops
// that were "from the future" compared with minValid. During this second application,
// nothing should need to be recloned.
log() << "oplog sync 2 of 3" << endl;
- if (!_syncDoInitialSync_applyToHead(tail, &r , source , lastOp , minValid)) {
+ if (!_initialSyncApplyOplog(tail, &r , source)) {
return;
}
// data should now be consistent
- lastOp = minValid;
-
sethbmsg("initial sync building indexes",0);
- if (!_syncDoInitialSync_clone(cloner, sourceHostname.c_str(), dbs, false)) {
+ if (!_initialSyncClone(cloner, r.conn()->getServerAddress(), dbs, false)) {
veto(source->fullName(), 600);
sleepsecs(300);
return;
@@ -446,7 +439,7 @@ namespace mongo {
}
log() << "oplog sync 3 of 3" << endl;
- if (!_syncDoInitialSync_applyToHead(tail, &r, source, lastOp, minValid)) {
+ if (!_initialSyncApplyOplog(tail, &r, source)) {
return;
}
@@ -466,13 +459,13 @@ namespace mongo {
Client::WriteContext cx( "local." );
cx.ctx().db()->flushFiles(true);
try {
- log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog;
+ log() << "replSet set minValid=" << lastOpTimeWritten << rsLog;
}
catch(...) { }
// Initial sync is now complete. Flag this by setting minValid to the last thing
// we synced.
- theReplSet->setMinValid(minValid);
+ theReplSet->setMinValid(lastOpTimeWritten);
// Clear the initial sync flag.
theReplSet->clearInitialSyncFlag();
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index a51565948d7..0f993a3bb87 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -87,8 +87,10 @@ namespace replset {
SyncTail::SyncTail(BackgroundSyncInterface *q) :
- Sync(""), oplogVersion(0), _networkQueue(q)
- {}
+ Sync(""), oplogVersion(0), _networkQueue(q), _applyFunc(multiSyncApply) {}
+
+ SyncTail::SyncTail(BackgroundSyncInterface *q, MultiSyncApplyFunc func) :
+ Sync(""), oplogVersion(0), _networkQueue(q), _applyFunc(func) {}
SyncTail::~SyncTail() {}
@@ -244,23 +246,21 @@ namespace replset {
}
// Doles out all the work to the writer pool threads and waits for them to complete
- void SyncTail::applyOps(const std::vector< std::vector<BSONObj> >& writerVectors,
- MultiSyncApplyFunc applyFunc) {
+ void SyncTail::applyOps(const std::vector< std::vector<BSONObj> >& writerVectors) {
ThreadPool& writerPool = theReplSet->getWriterPool();
TimerHolder timer(&applyBatchStats);
for (std::vector< std::vector<BSONObj> >::const_iterator it = writerVectors.begin();
it != writerVectors.end();
++it) {
if (!it->empty()) {
- writerPool.schedule(applyFunc, boost::cref(*it), this);
+ writerPool.schedule(_applyFunc, boost::cref(*it), this);
}
}
writerPool.join();
}
// Doles out all the work to the writer pool threads and waits for them to complete
- void SyncTail::multiApply( std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc ) {
-
+ void SyncTail::multiApply( std::deque<BSONObj>& ops) {
// Use a ThreadPool to prefetch all the operations in a batch.
prefetchOps(ops);
@@ -275,7 +275,7 @@ namespace replset {
// stop all readers until we're done
Lock::ParallelBatchWriterMode pbwm;
- applyOps(writerVectors, applyFunc);
+ applyOps(writerVectors);
}
@@ -297,88 +297,24 @@ namespace replset {
InitialSync::InitialSync(BackgroundSyncInterface *q) :
- SyncTail(q) {}
+ SyncTail(q, multiInitialSyncApply) {}
InitialSync::~InitialSync() {}
- BSONObj SyncTail::oplogApplySegment(const BSONObj& applyGTEObj, const BSONObj& minValidObj,
- MultiSyncApplyFunc func) {
- OpTime applyGTE = applyGTEObj["ts"]._opTime();
- OpTime minValid = minValidObj["ts"]._opTime();
-
- // We have to keep track of the last op applied to the data, because there's no other easy
- // way of getting this data synchronously. Batches may go past minValidObj, so we need to
- // know to bump minValid past minValidObj.
- BSONObj lastOp = applyGTEObj;
- OpTime ts = applyGTE;
-
- time_t start = time(0);
- time_t now = start;
-
- unsigned long long n = 0, lastN = 0;
-
- while( ts < minValid ) {
- OpQueue ops;
-
- while (ops.getSize() < replBatchLimitBytes) {
- if (tryPopAndWaitForMore(&ops)) {
- break;
- }
-
- // apply replication batch limits
- now = time(0);
- if (!ops.empty()) {
- if (now > replBatchLimitSeconds)
- break;
- if (ops.getDeque().size() > replBatchLimitOperations)
- break;
- }
- }
- setOplogVersion(ops.getDeque().front());
-
- multiApply(ops.getDeque(), func);
-
- n += ops.getDeque().size();
-
- if ( n > lastN + 1000 ) {
- if (now - start > 10) {
- // simple progress metering
- log() << "replSet initialSyncOplogApplication applied " << n << " operations, synced to "
- << ts.toStringPretty() << rsLog;
- start = now;
- lastN = n;
- }
- }
-
- // we want to keep a record of the last op applied, to compare with minvalid
- lastOp = ops.getDeque().back();
- OpTime tempTs = lastOp["ts"]._opTime();
- applyOpsToOplog(&ops.getDeque());
-
- ts = tempTs;
- }
-
- return lastOp;
- }
-
/* initial oplog application, during initial sync, after cloning.
*/
- BSONObj InitialSync::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) {
+ void InitialSync::oplogApplication(const OpTime& endOpTime) {
if (replSetForceInitialSyncFailure > 0) {
- log() << "replSet test code invoked, forced InitialSync failure: " << replSetForceInitialSyncFailure << rsLog;
+ log() << "replSet test code invoked, forced InitialSync failure: "
+ << replSetForceInitialSyncFailure << rsLog;
replSetForceInitialSyncFailure--;
throw DBException("forced error",0);
}
-
- // create the initial oplog entry
- syncApply(applyGTEObj);
- _logOpObjRS(applyGTEObj);
-
- return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply);
+ _applyOplogUntil(endOpTime);
}
- BSONObj SyncTail::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) {
- return oplogApplySegment(applyGTEObj, minValidObj, multiSyncApply);
+ void SyncTail::oplogApplication(const OpTime& endOpTime) {
+ _applyOplogUntil(endOpTime);
}
void SyncTail::setOplogVersion(const BSONObj& op) {
@@ -394,6 +330,56 @@ namespace replset {
}
}
+ void SyncTail::_applyOplogUntil(const OpTime& endOpTime) {
+ while (true) {
+ OpQueue ops;
+
+ verify( !Lock::isLocked() );
+
+ while (!tryPopAndWaitForMore(&ops)) {
+ // nothing came back last time, so go again
+ if (ops.empty()) continue;
+
+ // Check if we reached the end
+ const BSONObj currentOp = ops.back();
+ const OpTime currentOpTime = currentOp["ts"]._opTime();
+
+ // When we reach the end return this batch
+ if (currentOpTime == endOpTime) {
+ break;
+ }
+ else if (currentOpTime > endOpTime) {
+ severe() << "Applied past expected end " << endOpTime << " to " << currentOpTime
+ << " without seeing it. Rollback?" << rsLog;
+ fassertFailedNoTrace(0);
+ }
+
+ // apply replication batch limits
+ if (ops.getSize() > replBatchLimitBytes)
+ break;
+ if (ops.getDeque().size() > replBatchLimitOperations)
+ break;
+ };
+
+ if (ops.empty()) {
+ severe() << "got no ops for batch...";
+ fassertFailedNoTrace(0);
+ }
+
+ const BSONObj lastOp = ops.back().getOwned();
+
+ multiApply(ops.getDeque());
+ applyOpsToOplog(&ops.getDeque());
+
+ setOplogVersion(lastOp);
+
+ // if the last op applied was our end, return
+ if (theReplSet->lastOpTimeWritten == endOpTime) {
+ return;
+ }
+ } // end of while (true)
+ }
+
/* tail an oplog. ok to return, will be re-called. */
void SyncTail::oplogApplication() {
while( 1 ) {
@@ -458,7 +444,7 @@ namespace replset {
const int slaveDelaySecs = theReplSet->myConfig().slaveDelay;
if (!ops.empty() && slaveDelaySecs > 0) {
- const BSONObj& lastOp = ops.getDeque().back();
+ const BSONObj& lastOp = ops.back();
const unsigned int opTimestampSecs = lastOp["ts"]._opTime().getSecs();
// Stop the batch as the lastOp is too new to be applied. If we continue
@@ -479,7 +465,7 @@ namespace replset {
sleepmillis(0);
}
- const BSONObj& lastOp = ops.getDeque().back();
+ const BSONObj& lastOp = ops.back();
setOplogVersion(lastOp);
handleSlaveDelay(lastOp);
@@ -488,7 +474,7 @@ namespace replset {
// if we should crash and restart before updating the oplog
theReplSet->setMinValid(lastOp);
- multiApply(ops.getDeque(), multiSyncApply);
+ multiApply(ops.getDeque());
applyOpsToOplog(&ops.getDeque());
@@ -526,7 +512,7 @@ namespace replset {
return true;
}
- const char* ns = op["ns"].valuestrsafe();
+ const char* ns = op["ns"].valuestrsafe();
// check for commands
if ((op["op"].valuestrsafe()[0] == 'c') ||
diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h
index 392cbf5bd5b..4fe09072064 100644
--- a/src/mongo/db/repl/rs_sync.h
+++ b/src/mongo/db/repl/rs_sync.h
@@ -49,32 +49,15 @@ namespace replset {
typedef void (*MultiSyncApplyFunc)(const std::vector<BSONObj>& ops, SyncTail* st);
public:
SyncTail(BackgroundSyncInterface *q);
+ SyncTail(BackgroundSyncInterface *q, MultiSyncApplyFunc func);
+
virtual ~SyncTail();
virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false);
/**
- * Apply ops from applyGTEObj's ts to at least minValidObj's ts. Note that, due to
- * batching, this may end up applying ops beyond minValidObj's ts.
- *
- * @param applyGTEObj the op to start replicating at. This is actually not used except in
- * comparison to minValidObj: the background sync thread keeps its own
- * record of where we're synced to and starts providing ops from that
- * point.
- * @param minValidObj the op to finish syncing at. This function cannot return (other than
- * fatally erroring out) without applying at least this op.
- * @param func whether this should use initial sync logic (recloning docs) or
- * "normal" logic.
- * @return BSONObj the op that was synced to. This may be greater than minValidObj, as a
- * single batch might blow right by minvalid. If applyGTEObj is the same
- * op as minValidObj, this will be applyGTEObj.
- */
- BSONObj oplogApplySegment(const BSONObj& applyGTEObj, const BSONObj& minValidObj,
- MultiSyncApplyFunc func);
-
- /**
- * Runs oplogApplySegment without allowing recloning documents.
+ * Runs _applyOplogUntil(stopOpTime)
*/
- virtual BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj);
+ virtual void oplogApplication(const OpTime& stopOpTime);
void oplogApplication();
bool peek(BSONObj* obj);
@@ -91,6 +74,11 @@ namespace replset {
bool empty() {
return _deque.empty();
}
+ BSONObj back(){
+ verify(!_deque.empty());
+ return _deque.back();
+ }
+
private:
std::deque<BSONObj> _deque;
size_t _size;
@@ -113,9 +101,18 @@ namespace replset {
static const int replBatchLimitSeconds = 1;
static const unsigned int replBatchLimitOperations = 5000;
- // Prefetch and write a deque of operations, using the supplied function.
- // Initial Sync and Sync Tail each use a different function.
- void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc);
+ // Prefetch and write a deque of operations.
+ void multiApply(std::deque<BSONObj>& ops);
+
+ /**
+ * Applies oplog entries until reaching "endOpTime".
+ *
+ * Returns the OpTime from the last doc applied
+ *
+ * NOTE:Will not transition or check states
+ */
+ void _applyOplogUntil(const OpTime& endOpTime);
+
// The version of the last op to be read
int oplogVersion;
@@ -123,14 +120,16 @@ namespace replset {
private:
BackgroundSyncInterface* _networkQueue;
+ // Function to use during applyOps
+ MultiSyncApplyFunc _applyFunc;
+
// Doles out all the work to the reader pool threads and waits for them to complete
void prefetchOps(const std::deque<BSONObj>& ops);
// Used by the thread pool readers to prefetch an op
static void prefetchOp(const BSONObj& op);
// Doles out all the work to the writer pool threads and waits for them to complete
- void applyOps(const std::vector< std::vector<BSONObj> >& writerVectors,
- MultiSyncApplyFunc applyFunc);
+ void applyOps(const std::vector< std::vector<BSONObj> >& writerVectors);
void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors);
@@ -145,12 +144,8 @@ namespace replset {
public:
virtual ~InitialSync();
InitialSync(BackgroundSyncInterface *q);
+ virtual void oplogApplication(const OpTime& stopOpTime);
- /**
- * Creates the initial oplog entry: applies applyGTEObj and writes it to the oplog. Then
- * this runs oplogApplySegment allowing recloning documents.
- */
- BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj);
};
// TODO: move hbmsg into an error-keeping class (SERVER-4444)
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index b0a05b70229..b408009d875 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -816,7 +816,90 @@ ReplSetTest.prototype.stopSet = function( signal , forRestart, opts ) {
print('ReplSetTest stopSet *** Shut down repl set - test worked ****' )
};
+/**
+ * Walks all oplogs and ensures matching entries.
+ */
+ReplSetTest.prototype.ensureOplogsMatch = function() {
+ "use strict";
+ var OplogReader = function(mongo) {
+ this.next = function() {
+ if (!this.cursor)
+ throw Error("reader is not open!");
+
+ var nextDoc = this.cursor.next();
+ if (nextDoc)
+ this.lastDoc = nextDoc;
+ return nextDoc;
+ };
+
+ this.getLastDoc = function() {
+ if (this.lastDoc)
+ return this.lastDoc;
+ return this.next();
+ };
+
+ this.hasNext = function() {
+ if (!this.cursor)
+ throw Error("reader is not open!");
+ return this.cursor.hasNext();
+ };
+
+ this.query = function(ts) {
+ var coll = this.getOplogColl();
+ var query = {"ts": {"$gte": ts ? ts : new Timestamp()}};
+ this.cursor = coll.find(query).sort({$natural:1})
+ this.cursor.addOption(DBQuery.Option.oplogReplay);
+ };
+
+ this.getFirstDoc = function(){
+ return this.getOplogColl().find().sort({$natural:1}).limit(-1).next();
+ };
+
+ this.getOplogColl = function () {
+ return this.mongo.getDB("local")["oplog.rs"];
+ }
+
+ this.lastDoc = null;
+ this.cursor = null;
+ this.mongo = mongo;
+ };
+
+ if (this.nodes.length && this.nodes.length > 1) {
+ var readers = [];
+ var largestTS = null;
+ var nodes = this.nodes;
+ var rsSize = nodes.length;
+ for (var i = 0; i < rsSize; i++) {
+ readers[i] = new OplogReader(nodes[i]);
+ var currTS = readers[i].getFirstDoc()["ts"];
+ if (currTS.t > largestTS.t || (currTS.t == largestTS.t && currTS.i > largestTS.i) ) {
+ largestTS = currTS;
+ }
+ }
+
+ // start all oplogReaders at the same place.
+ for (var i = 0; i < rsSize; i++) {
+ readers[i].query(largestTS);
+ }
+ var firstReader = readers[0];
+ while (firstReader.hasNext()) {
+ var ts = firstReader.next()["ts"];
+ for(var i = 1; i < rsSize; i++) {
+ assert.eq(ts,
+ readers[i].next()["ts"],
+ " non-matching ts for node: " + readers[i].mongo);
+ }
+ }
+
+ // ensure no other node has more oplog
+ for (var i = 1; i < rsSize; i++) {
+ assert.eq(false,
+ readers[i].hasNext(),
+ "" + readers[i] + " shouldn't have more oplog.");
+ }
+ }
+}
/**
* Waits until there is a master node
*/