diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2014-09-03 11:40:47 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2014-09-04 12:09:21 -0400 |
commit | dbf18450b53d4ee1df99c701120567219608adc0 (patch) | |
tree | bf8c0a4a0961a8ca27197bedf70ed396486653ec | |
parent | 8f8e59a78107a079c5367abdc31580d20428d9cc (diff) | |
download | mongo-dbf18450b53d4ee1df99c701120567219608adc0.tar.gz |
SERVER-14805: initialsync multithreaded oplog apply
-rw-r--r-- | jstests/replsets/resync_with_write_load.js | 82 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 115 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.h | 57 | ||||
-rw-r--r-- | src/mongo/shell/replsettest.js | 83 |
7 files changed, 325 insertions, 187 deletions
diff --git a/jstests/replsets/resync_with_write_load.js b/jstests/replsets/resync_with_write_load.js new file mode 100644 index 00000000000..aaf53c91dd8 --- /dev/null +++ b/jstests/replsets/resync_with_write_load.js @@ -0,0 +1,82 @@ +/** + * This test creates a 2 node replica set and then puts load on the primary with writes during + * the resync in order to verify that all phases of the initial sync work correctly. + * + * We cannot test each phase of the initial sync directly but by providing constant writes we can + * assume that each individual phase will have data to work with, and therefore tested. + */ +var replTest = new ReplSetTest({name: 'resync', nodes: 2, oplogSize: 100}); +var nodes = replTest.nodeList(); + +var conns = replTest.startSet(); +var config = { "_id": "resync", + "members": [ + {"_id": 0, "host": nodes[0], priority:4}, + {"_id": 1, "host": nodes[1]}] + }; +var r = replTest.initiate(config); + +// Make sure we have a master +var master = replTest.getMaster(); +var a_conn = conns[0]; +var b_conn = conns[1]; +a_conn.setSlaveOk(); +b_conn.setSlaveOk(); +var A = a_conn.getDB("test"); +var B = b_conn.getDB("test"); +var AID = replTest.getNodeId(a_conn); +var BID = replTest.getNodeId(b_conn); +assert(master == conns[0], "conns[0] assumed to be master"); +assert(a_conn.host == master.host); + +// create an oplog entry with an insert +assert.writeOK( A.foo.insert({ x: 1 }, { writeConcern: { w: 1, wtimeout: 60000 }})); +replTest.stop(BID); + +print("******************** starting load for 30 secs *********************"); +var work = 'var start=new Date().getTime(); db.timeToStartTrigger.insert({_id:1}); while(true) {for(x=0;x<1000;x++) {db["a" + x].insert({a:x})};sleep(1); if((new Date().getTime() - start) > 30000) break; }'; + +//insert enough that resync node has to go through oplog replay in each step +var loadGen = startParallelShell( work, replTest.ports[0] ); + +// wait for document to appear to continue +assert.soon(function() { + try { + return 1 == a_conn.getDB("test")["timeToStartTrigger"].count(); + } catch ( e ) { + print( e ); + return false; + } +}, "waited too long for start trigger"); + +print("*************** STARTING node without data ***************"); +replTest.start(BID); +// check that it is up +assert.soon(function() { + try { + var result = b_conn.getDB("admin").runCommand({replSetGetStatus: 1}); + return true; + } catch ( e ) { + print( e ); + return false; + } +}, "node didn't come up"); + +print("waiting for load generation to finish"); +loadGen(); + +// load must stop before we await replication. +replTest.awaitReplication(); + +// Make sure oplogs match +try { + replTest.ensureOplogsMatch(); +} catch (e) { + var aDBHash = A.runCommand("dbhash"); + var bDBHash = B.runCommand("dbhash"); + assert.eq(aDBHash.md5, bDBHash.md5, + "hashes differ: " + tojson(aDBHash) + " to " + tojson(bDBHash)); +} +replTest.stopSet(); + +print("*****test done******"); diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp index 0e733b7f489..efcb546f2d0 100644 --- a/src/mongo/db/repl/rs.cpp +++ b/src/mongo/db/repl/rs.cpp @@ -1010,14 +1010,13 @@ namespace { } return false; } + void ReplSetImpl::setMinValid(OpTime ts) { + Lock::DBWrite lk( "local" ); + Helpers::putSingleton("local.replset.minvalid", BSON("$set" << BSON("ts" << ts))); + } void ReplSetImpl::setMinValid(BSONObj obj) { - BSONObjBuilder builder; - BSONObjBuilder subobj(builder.subobjStart("$set")); - subobj.appendTimestamp("ts", obj["ts"].date()); - subobj.done(); - Lock::DBWrite lk( "local" ); - Helpers::putSingleton("local.replset.minvalid", builder.obj()); + setMinValid(obj["ts"]._opTime()); } OpTime ReplSetImpl::getMinValid() { diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h index 46bed5b53a1..e9ccbbadc48 100644 --- a/src/mongo/db/repl/rs.h +++ b/src/mongo/db/repl/rs.h @@ -569,12 +569,11 @@ namespace mongo { friend class Consensus; private: - bool _syncDoInitialSync_clone(Cloner &cloner, const char *master, - const list<string>& dbs, bool dataPass); - bool _syncDoInitialSync_applyToHead( replset::SyncTail& syncer, OplogReader* r , - const Member* source, const BSONObj& lastOp, - BSONObj& minValidOut); - void _syncDoInitialSync(); + bool _initialSyncClone(Cloner &cloner, const std::string& master, + const list<string>& dbs, bool dataPass); + bool _initialSyncApplyOplog(replset::SyncTail& syncer, OplogReader* r , + const Member* source); + void _initialSync(); void syncDoInitialSync(); void _syncThread(); void syncTail(); @@ -626,6 +625,7 @@ namespace mongo { * applied. */ static void setMinValid(BSONObj obj); + static void setMinValid(OpTime ts); static OpTime getMinValid(); static void clearInitialSyncFlag(); static bool getInitialSyncFlag(); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 98369fe801c..dcede45b7db 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -66,7 +66,7 @@ namespace mongo { int failedAttempts = 0; while ( failedAttempts < maxFailedAttempts ) { try { - _syncDoInitialSync(); + _initialSync(); break; } catch(DBException& e) { @@ -81,8 +81,8 @@ namespace mongo { fassert( 16233, failedAttempts < maxFailedAttempts); } - bool ReplSetImpl::_syncDoInitialSync_clone(Cloner& cloner, const char *master, - const list<string>& dbs, bool dataPass) { + bool ReplSetImpl::_initialSyncClone(Cloner& cloner, const std::string& master, + const list<string>& dbs, bool dataPass) { for( list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++ ) { string db = *i; @@ -109,7 +109,7 @@ namespace mongo { options.syncData = dataPass; options.syncIndexes = ! dataPass; - if (!cloner.go(ctx.ctx(), master, options, NULL, err, &errCode)) { + if (!cloner.go(ctx.ctx(), master.c_str(), options, NULL, err, &errCode)) { sethbmsg(str::stream() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db << ". " << (err.empty() ? "" : err + ". ") @@ -266,26 +266,20 @@ namespace mongo { * @param syncer either initial sync (can reclone missing docs) or "normal" sync (no recloning) * @param r the oplog reader * @param source the sync target - * @param lastOp the op to start syncing at. replset::InitialSync writes this and then moves to - * the queue. replset::SyncTail does not write this, it moves directly to the - * queue. - * @param minValid populated by this function. The most recent op on the sync target's oplog, - * this function syncs to this value (inclusive) * @return if applying the oplog succeeded */ - bool ReplSetImpl::_syncDoInitialSync_applyToHead( replset::SyncTail& syncer, OplogReader* r, - const Member* source, const BSONObj& lastOp , - BSONObj& minValid ) { - /* our cloned copy will be strange until we apply oplog events that occurred - through the process. we note that time point here. */ - + bool ReplSetImpl::_initialSyncApplyOplog( replset::SyncTail& syncer, + OplogReader* r, + const Member* source) { + const OpTime startOpTime = lastOpTimeWritten; + BSONObj lastOp; try { // It may have been a long time since we last used this connection to // query the oplog, depending on the size of the databases we needed to clone. // A common problem is that TCP keepalives are set too infrequent, and thus // our connection here is terminated by a firewall due to inactivity. // Solution is to increase the TCP keepalive frequency. - minValid = r->getLastOp(rsoplog); + lastOp = r->getLastOp(rsoplog); } catch ( SocketException & ) { log() << "connection lost to " << source->h().toString() << "; is your tcp keepalive interval set appropriately?"; if( !r->connect(source->h().toString()) ) { @@ -293,42 +287,43 @@ namespace mongo { throw; } // retry - minValid = r->getLastOp(rsoplog); + lastOp = r->getLastOp(rsoplog); } - isyncassert( "getLastOp is empty ", !minValid.isEmpty() ); - - OpTime mvoptime = minValid["ts"]._opTime(); - verify( !mvoptime.isNull() ); + isyncassert( "lastOp is empty ", !lastOp.isEmpty() ); - OpTime startingTS = lastOp["ts"]._opTime(); - verify( mvoptime >= startingTS ); + OpTime stopOpTime = lastOp["ts"]._opTime(); - // apply startingTS..mvoptime portion of the oplog - { - try { - minValid = syncer.oplogApplication(lastOp, minValid); - } - catch (const DBException&) { - log() << "replSet initial sync failed during oplog application phase" << rsLog; + // If we already have what we need then return. + if (stopOpTime == startOpTime) + return true; - emptyOplog(); // otherwise we'll be up! + verify( !stopOpTime.isNull() ); + verify( stopOpTime > startOpTime ); - lastOpTimeWritten = OpTime(); - lastH = 0; + // apply till lastOpTime + try { + syncer.oplogApplication(stopOpTime); + } + catch (const DBException&) { + log() << "replSet initial sync failed during oplog application phase" << rsLog; - log() << "replSet cleaning up [1]" << rsLog; - { - Client::WriteContext cx( "local." ); - cx.ctx().db()->flushFiles(true); - } - log() << "replSet cleaning up [2]" << rsLog; + emptyOplog(); // otherwise we'll be up! - log() << "replSet initial sync failed will try again" << endl; + lastOpTimeWritten = OpTime(); + lastH = 0; - sleepsecs(5); - return false; + log() << "replSet cleaning up [1]" << rsLog; + { + Client::WriteContext cx( "local." ); + cx.ctx().db()->flushFiles(true); } + log() << "replSet cleaning up [2]" << rsLog; + + log() << "replSet initial sync failed will try again" << endl; + + sleepsecs(5); + return false; } return true; @@ -337,7 +332,7 @@ namespace mongo { /** * Do the initial sync for this member. There are several steps to this process: * - * 0. Add _initialSyncFlag to minValid to tell us to restart initial sync if we + * 0. Add _initialSyncFlag to minValid collection to tell us to restart initial sync if we * crash in the middle of this procedure * 1. Record start time. * 2. Clone. @@ -348,14 +343,14 @@ namespace mongo { * 7. Build indexes. * 8. Set minValid3 to sync target's latest op time. * 9. Apply ops from minValid2 to minValid3. - 10. Clean up minValid and remove _initialSyncFlag field + 10. Cleanup minValid collection: remove _initialSyncFlag field, set ts to minValid3 OpTime * * At that point, initial sync is finished. Note that the oplog from the sync target is applied * three times: step 4, 6, and 8. 4 may involve refetching, 6 should not. By the end of 6, * this member should have consistent data. 8 is "cosmetic," it is only to get this member - * closer to the latest op time before it can transition to secondary state. + * closer to the latest op time before it can transition out of startup state */ - void ReplSetImpl::_syncDoInitialSync() { + void ReplSetImpl::_initialSync() { replset::InitialSync init(replset::BackgroundSync::get()); replset::SyncTail tail(replset::BackgroundSync::get()); sethbmsg("initial sync pending",0); @@ -389,14 +384,12 @@ namespace mongo { return; } - // written by applyToHead calls - BSONObj minValid; - if (replSettings.fastsync) { log() << "fastsync: skipping database clone" << rsLog; // prime oplog - init.oplogApplication(lastOp, lastOp); + init.syncApply(lastOp, false); + _logOpObjRS(lastOp); return; } else { @@ -411,7 +404,7 @@ namespace mongo { list<string> dbs = r.conn()->getDatabaseNames(); Cloner cloner; - if (!_syncDoInitialSync_clone(cloner, sourceHostname.c_str(), dbs, true)) { + if (!_initialSyncClone(cloner, r.conn()->getServerAddress(), dbs, true)) { veto(source->fullName(), 600); sleepsecs(300); return; @@ -419,26 +412,26 @@ namespace mongo { sethbmsg("initial sync data copy, starting syncup",0); + // prime oplog + init.syncApply(lastOp, false); + _logOpObjRS(lastOp); + log() << "oplog sync 1 of 3" << endl; - if ( ! _syncDoInitialSync_applyToHead( init, &r , source , lastOp , minValid ) ) { + if (!_initialSyncApplyOplog( init, &r , source)) { return; } - lastOp = minValid; - // Now we sync to the latest op on the sync target _again_, as we may have recloned ops // that were "from the future" compared with minValid. During this second application, // nothing should need to be recloned. log() << "oplog sync 2 of 3" << endl; - if (!_syncDoInitialSync_applyToHead(tail, &r , source , lastOp , minValid)) { + if (!_initialSyncApplyOplog(tail, &r , source)) { return; } // data should now be consistent - lastOp = minValid; - sethbmsg("initial sync building indexes",0); - if (!_syncDoInitialSync_clone(cloner, sourceHostname.c_str(), dbs, false)) { + if (!_initialSyncClone(cloner, r.conn()->getServerAddress(), dbs, false)) { veto(source->fullName(), 600); sleepsecs(300); return; @@ -446,7 +439,7 @@ namespace mongo { } log() << "oplog sync 3 of 3" << endl; - if (!_syncDoInitialSync_applyToHead(tail, &r, source, lastOp, minValid)) { + if (!_initialSyncApplyOplog(tail, &r, source)) { return; } @@ -466,13 +459,13 @@ namespace mongo { Client::WriteContext cx( "local." ); cx.ctx().db()->flushFiles(true); try { - log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog; + log() << "replSet set minValid=" << lastOpTimeWritten << rsLog; } catch(...) { } // Initial sync is now complete. Flag this by setting minValid to the last thing // we synced. - theReplSet->setMinValid(minValid); + theReplSet->setMinValid(lastOpTimeWritten); // Clear the initial sync flag. theReplSet->clearInitialSyncFlag(); diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index a51565948d7..0f993a3bb87 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -87,8 +87,10 @@ namespace replset { SyncTail::SyncTail(BackgroundSyncInterface *q) : - Sync(""), oplogVersion(0), _networkQueue(q) - {} + Sync(""), oplogVersion(0), _networkQueue(q), _applyFunc(multiSyncApply) {} + + SyncTail::SyncTail(BackgroundSyncInterface *q, MultiSyncApplyFunc func) : + Sync(""), oplogVersion(0), _networkQueue(q), _applyFunc(func) {} SyncTail::~SyncTail() {} @@ -244,23 +246,21 @@ namespace replset { } // Doles out all the work to the writer pool threads and waits for them to complete - void SyncTail::applyOps(const std::vector< std::vector<BSONObj> >& writerVectors, - MultiSyncApplyFunc applyFunc) { + void SyncTail::applyOps(const std::vector< std::vector<BSONObj> >& writerVectors) { ThreadPool& writerPool = theReplSet->getWriterPool(); TimerHolder timer(&applyBatchStats); for (std::vector< std::vector<BSONObj> >::const_iterator it = writerVectors.begin(); it != writerVectors.end(); ++it) { if (!it->empty()) { - writerPool.schedule(applyFunc, boost::cref(*it), this); + writerPool.schedule(_applyFunc, boost::cref(*it), this); } } writerPool.join(); } // Doles out all the work to the writer pool threads and waits for them to complete - void SyncTail::multiApply( std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc ) { - + void SyncTail::multiApply( std::deque<BSONObj>& ops) { // Use a ThreadPool to prefetch all the operations in a batch. prefetchOps(ops); @@ -275,7 +275,7 @@ namespace replset { // stop all readers until we're done Lock::ParallelBatchWriterMode pbwm; - applyOps(writerVectors, applyFunc); + applyOps(writerVectors); } @@ -297,88 +297,24 @@ namespace replset { InitialSync::InitialSync(BackgroundSyncInterface *q) : - SyncTail(q) {} + SyncTail(q, multiInitialSyncApply) {} InitialSync::~InitialSync() {} - BSONObj SyncTail::oplogApplySegment(const BSONObj& applyGTEObj, const BSONObj& minValidObj, - MultiSyncApplyFunc func) { - OpTime applyGTE = applyGTEObj["ts"]._opTime(); - OpTime minValid = minValidObj["ts"]._opTime(); - - // We have to keep track of the last op applied to the data, because there's no other easy - // way of getting this data synchronously. Batches may go past minValidObj, so we need to - // know to bump minValid past minValidObj. - BSONObj lastOp = applyGTEObj; - OpTime ts = applyGTE; - - time_t start = time(0); - time_t now = start; - - unsigned long long n = 0, lastN = 0; - - while( ts < minValid ) { - OpQueue ops; - - while (ops.getSize() < replBatchLimitBytes) { - if (tryPopAndWaitForMore(&ops)) { - break; - } - - // apply replication batch limits - now = time(0); - if (!ops.empty()) { - if (now > replBatchLimitSeconds) - break; - if (ops.getDeque().size() > replBatchLimitOperations) - break; - } - } - setOplogVersion(ops.getDeque().front()); - - multiApply(ops.getDeque(), func); - - n += ops.getDeque().size(); - - if ( n > lastN + 1000 ) { - if (now - start > 10) { - // simple progress metering - log() << "replSet initialSyncOplogApplication applied " << n << " operations, synced to " - << ts.toStringPretty() << rsLog; - start = now; - lastN = n; - } - } - - // we want to keep a record of the last op applied, to compare with minvalid - lastOp = ops.getDeque().back(); - OpTime tempTs = lastOp["ts"]._opTime(); - applyOpsToOplog(&ops.getDeque()); - - ts = tempTs; - } - - return lastOp; - } - /* initial oplog application, during initial sync, after cloning. */ - BSONObj InitialSync::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) { + void InitialSync::oplogApplication(const OpTime& endOpTime) { if (replSetForceInitialSyncFailure > 0) { - log() << "replSet test code invoked, forced InitialSync failure: " << replSetForceInitialSyncFailure << rsLog; + log() << "replSet test code invoked, forced InitialSync failure: " + << replSetForceInitialSyncFailure << rsLog; replSetForceInitialSyncFailure--; throw DBException("forced error",0); } - - // create the initial oplog entry - syncApply(applyGTEObj); - _logOpObjRS(applyGTEObj); - - return oplogApplySegment(applyGTEObj, minValidObj, multiInitialSyncApply); + _applyOplogUntil(endOpTime); } - BSONObj SyncTail::oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj) { - return oplogApplySegment(applyGTEObj, minValidObj, multiSyncApply); + void SyncTail::oplogApplication(const OpTime& endOpTime) { + _applyOplogUntil(endOpTime); } void SyncTail::setOplogVersion(const BSONObj& op) { @@ -394,6 +330,56 @@ namespace replset { } } + void SyncTail::_applyOplogUntil(const OpTime& endOpTime) { + while (true) { + OpQueue ops; + + verify( !Lock::isLocked() ); + + while (!tryPopAndWaitForMore(&ops)) { + // nothing came back last time, so go again + if (ops.empty()) continue; + + // Check if we reached the end + const BSONObj currentOp = ops.back(); + const OpTime currentOpTime = currentOp["ts"]._opTime(); + + // When we reach the end return this batch + if (currentOpTime == endOpTime) { + break; + } + else if (currentOpTime > endOpTime) { + severe() << "Applied past expected end " << endOpTime << " to " << currentOpTime + << " without seeing it. Rollback?" << rsLog; + fassertFailedNoTrace(0); + } + + // apply replication batch limits + if (ops.getSize() > replBatchLimitBytes) + break; + if (ops.getDeque().size() > replBatchLimitOperations) + break; + }; + + if (ops.empty()) { + severe() << "got no ops for batch..."; + fassertFailedNoTrace(0); + } + + const BSONObj lastOp = ops.back().getOwned(); + + multiApply(ops.getDeque()); + applyOpsToOplog(&ops.getDeque()); + + setOplogVersion(lastOp); + + // if the last op applied was our end, return + if (theReplSet->lastOpTimeWritten == endOpTime) { + return; + } + } // end of while (true) + } + /* tail an oplog. ok to return, will be re-called. */ void SyncTail::oplogApplication() { while( 1 ) { @@ -458,7 +444,7 @@ namespace replset { const int slaveDelaySecs = theReplSet->myConfig().slaveDelay; if (!ops.empty() && slaveDelaySecs > 0) { - const BSONObj& lastOp = ops.getDeque().back(); + const BSONObj& lastOp = ops.back(); const unsigned int opTimestampSecs = lastOp["ts"]._opTime().getSecs(); // Stop the batch as the lastOp is too new to be applied. If we continue @@ -479,7 +465,7 @@ namespace replset { sleepmillis(0); } - const BSONObj& lastOp = ops.getDeque().back(); + const BSONObj& lastOp = ops.back(); setOplogVersion(lastOp); handleSlaveDelay(lastOp); @@ -488,7 +474,7 @@ namespace replset { // if we should crash and restart before updating the oplog theReplSet->setMinValid(lastOp); - multiApply(ops.getDeque(), multiSyncApply); + multiApply(ops.getDeque()); applyOpsToOplog(&ops.getDeque()); @@ -526,7 +512,7 @@ namespace replset { return true; } - const char* ns = op["ns"].valuestrsafe(); + const char* ns = op["ns"].valuestrsafe(); // check for commands if ((op["op"].valuestrsafe()[0] == 'c') || diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h index 392cbf5bd5b..4fe09072064 100644 --- a/src/mongo/db/repl/rs_sync.h +++ b/src/mongo/db/repl/rs_sync.h @@ -49,32 +49,15 @@ namespace replset { typedef void (*MultiSyncApplyFunc)(const std::vector<BSONObj>& ops, SyncTail* st); public: SyncTail(BackgroundSyncInterface *q); + SyncTail(BackgroundSyncInterface *q, MultiSyncApplyFunc func); + virtual ~SyncTail(); virtual bool syncApply(const BSONObj &o, bool convertUpdateToUpsert = false); /** - * Apply ops from applyGTEObj's ts to at least minValidObj's ts. Note that, due to - * batching, this may end up applying ops beyond minValidObj's ts. - * - * @param applyGTEObj the op to start replicating at. This is actually not used except in - * comparison to minValidObj: the background sync thread keeps its own - * record of where we're synced to and starts providing ops from that - * point. - * @param minValidObj the op to finish syncing at. This function cannot return (other than - * fatally erroring out) without applying at least this op. - * @param func whether this should use initial sync logic (recloning docs) or - * "normal" logic. - * @return BSONObj the op that was synced to. This may be greater than minValidObj, as a - * single batch might blow right by minvalid. If applyGTEObj is the same - * op as minValidObj, this will be applyGTEObj. - */ - BSONObj oplogApplySegment(const BSONObj& applyGTEObj, const BSONObj& minValidObj, - MultiSyncApplyFunc func); - - /** - * Runs oplogApplySegment without allowing recloning documents. + * Runs _applyOplogUntil(stopOpTime) */ - virtual BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj); + virtual void oplogApplication(const OpTime& stopOpTime); void oplogApplication(); bool peek(BSONObj* obj); @@ -91,6 +74,11 @@ namespace replset { bool empty() { return _deque.empty(); } + BSONObj back(){ + verify(!_deque.empty()); + return _deque.back(); + } + private: std::deque<BSONObj> _deque; size_t _size; @@ -113,9 +101,18 @@ namespace replset { static const int replBatchLimitSeconds = 1; static const unsigned int replBatchLimitOperations = 5000; - // Prefetch and write a deque of operations, using the supplied function. - // Initial Sync and Sync Tail each use a different function. - void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc); + // Prefetch and write a deque of operations. + void multiApply(std::deque<BSONObj>& ops); + + /** + * Applies oplog entries until reaching "endOpTime". + * + * Returns the OpTime from the last doc applied + * + * NOTE:Will not transition or check states + */ + void _applyOplogUntil(const OpTime& endOpTime); + // The version of the last op to be read int oplogVersion; @@ -123,14 +120,16 @@ namespace replset { private: BackgroundSyncInterface* _networkQueue; + // Function to use during applyOps + MultiSyncApplyFunc _applyFunc; + // Doles out all the work to the reader pool threads and waits for them to complete void prefetchOps(const std::deque<BSONObj>& ops); // Used by the thread pool readers to prefetch an op static void prefetchOp(const BSONObj& op); // Doles out all the work to the writer pool threads and waits for them to complete - void applyOps(const std::vector< std::vector<BSONObj> >& writerVectors, - MultiSyncApplyFunc applyFunc); + void applyOps(const std::vector< std::vector<BSONObj> >& writerVectors); void fillWriterVectors(const std::deque<BSONObj>& ops, std::vector< std::vector<BSONObj> >* writerVectors); @@ -145,12 +144,8 @@ namespace replset { public: virtual ~InitialSync(); InitialSync(BackgroundSyncInterface *q); + virtual void oplogApplication(const OpTime& stopOpTime); - /** - * Creates the initial oplog entry: applies applyGTEObj and writes it to the oplog. Then - * this runs oplogApplySegment allowing recloning documents. - */ - BSONObj oplogApplication(const BSONObj& applyGTEObj, const BSONObj& minValidObj); }; // TODO: move hbmsg into an error-keeping class (SERVER-4444) diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index b0a05b70229..b408009d875 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -816,7 +816,90 @@ ReplSetTest.prototype.stopSet = function( signal , forRestart, opts ) { print('ReplSetTest stopSet *** Shut down repl set - test worked ****' ) }; +/** + * Walks all oplogs and ensures matching entries. + */ +ReplSetTest.prototype.ensureOplogsMatch = function() { + "use strict"; + var OplogReader = function(mongo) { + this.next = function() { + if (!this.cursor) + throw Error("reader is not open!"); + + var nextDoc = this.cursor.next(); + if (nextDoc) + this.lastDoc = nextDoc; + return nextDoc; + }; + + this.getLastDoc = function() { + if (this.lastDoc) + return this.lastDoc; + return this.next(); + }; + + this.hasNext = function() { + if (!this.cursor) + throw Error("reader is not open!"); + return this.cursor.hasNext(); + }; + + this.query = function(ts) { + var coll = this.getOplogColl(); + var query = {"ts": {"$gte": ts ? ts : new Timestamp()}}; + this.cursor = coll.find(query).sort({$natural:1}) + this.cursor.addOption(DBQuery.Option.oplogReplay); + }; + + this.getFirstDoc = function(){ + return this.getOplogColl().find().sort({$natural:1}).limit(-1).next(); + }; + + this.getOplogColl = function () { + return this.mongo.getDB("local")["oplog.rs"]; + } + + this.lastDoc = null; + this.cursor = null; + this.mongo = mongo; + }; + + if (this.nodes.length && this.nodes.length > 1) { + var readers = []; + var largestTS = null; + var nodes = this.nodes; + var rsSize = nodes.length; + for (var i = 0; i < rsSize; i++) { + readers[i] = new OplogReader(nodes[i]); + var currTS = readers[i].getFirstDoc()["ts"]; + if (currTS.t > largestTS.t || (currTS.t == largestTS.t && currTS.i > largestTS.i) ) { + largestTS = currTS; + } + } + + // start all oplogReaders at the same place. + for (var i = 0; i < rsSize; i++) { + readers[i].query(largestTS); + } + var firstReader = readers[0]; + while (firstReader.hasNext()) { + var ts = firstReader.next()["ts"]; + for(var i = 1; i < rsSize; i++) { + assert.eq(ts, + readers[i].next()["ts"], + " non-matching ts for node: " + readers[i].mongo); + } + } + + // ensure no other node has more oplog + for (var i = 1; i < rsSize; i++) { + assert.eq(false, + readers[i].hasNext(), + "" + readers[i] + " shouldn't have more oplog."); + } + } +} /** * Waits until there is a master node */ |