diff options
-rw-r--r-- | jstests/replsets/replset9.js | 52 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 12 |
2 files changed, 63 insertions, 1 deletions
diff --git a/jstests/replsets/replset9.js b/jstests/replsets/replset9.js new file mode 100644 index 00000000000..e3912877405 --- /dev/null +++ b/jstests/replsets/replset9.js @@ -0,0 +1,52 @@ + + +var rt = new ReplSetTest( { name : "replset9tests" , nodes: 1, oplogSize: 400 } ); + +var nodes = rt.startSet(); +rt.initiate(); +var master = rt.getMaster(); +var bigstring = "a"; +var md = master.getDB( 'd' ); +var mdc = md[ 'c' ]; + +// idea: while cloner is running, update some docs and then immediately remove them. +// oplog will have ops referencing docs that no longer exist. + +var doccount = 20000; +// Avoid empty extent issues +mdc.insert( { _id:-1, x:"dummy" } ); + +// Make this db big so that cloner takes a while. +print ("inserting bigstrings"); +for( i = 0; i < doccount; ++i ) { + mdc.insert( { _id:i, x:bigstring } ); + bigstring += "a"; +} +md.getLastError(); + +// Insert some docs to update and remove +print ("inserting x"); +for( i = doccount; i < doccount*2; ++i ) { + mdc.insert( { _id:i, bs:bigstring, x:i } ); +} +md.getLastError(); + +// add a secondary; start cloning +var slave = rt.add(); +rt.reInitiate(); +print ("initiation complete!"); +var sc = slave.getDB( 'd' )[ 'c' ]; +slave.setSlaveOk(); + +print ("updating and deleting documents"); +for (i = doccount*2; i > doccount; --i) { + mdc.update( { _id:i }, { $inc: { x : 1 } } ); + md.getLastError(); + mdc.remove( { _id:i } ); + md.getLastError(); + mdc.insert( { bs:bigstring } ); + md.getLastError(); +} +print ("finished"); +// Wait for replication to catch up. +rt.awaitReplication(640000); diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 7c49b89ad70..3c71b075bd3 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -257,6 +257,8 @@ namespace replset { OpTime ts = applyGTE; time_t start = time(0); + time_t now = start; + unsigned long long n = 0, lastN = 0; while( ts < minValid ) { @@ -266,6 +268,15 @@ namespace replset { if (tryPopAndWaitForMore(&ops)) { break; } + + // apply replication batch limits + now = time(0); + if (!ops.empty()) { + if (now > replBatchLimitSeconds) + break; + if (ops.getDeque().size() > replBatchLimitOperations) + break; + } } setOplogVersion(ops.getDeque().front()); @@ -274,7 +285,6 @@ namespace replset { n += ops.getDeque().size(); if ( n > lastN + 1000 ) { - time_t now = time(0); if (now - start > 10) { // simple progress metering log() << "replSet initialSyncOplogApplication applied " << n << " operations, synced to " |