diff options
author | Eric Milkie <milkie@10gen.com> | 2012-10-04 18:04:31 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2012-10-05 09:49:57 -0400 |
commit | edd2fa6fea3f52e1e9ac06f26a7f60d5758adae9 (patch) | |
tree | f3c007a76fc8a0644aeea83808773ff436c73513 | |
parent | 8e660532455d1165ff039068336c3dcb1cd5ad5f (diff) | |
download | mongo-edd2fa6fea3f52e1e9ac06f26a7f60d5758adae9.tar.gz |
SERVER-6671 end batch early if oplog version change is detected
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.h | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/replsettests.cpp | 18 |
3 files changed, 43 insertions, 2 deletions
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 0b12cac7af2..90d3a252754 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -39,7 +39,7 @@ namespace mongo { namespace replset { SyncTail::SyncTail(BackgroundSyncInterface *q) : - Sync(""), _networkQueue(q) + Sync(""), oplogVersion(0), _networkQueue(q) {} SyncTail::~SyncTail() {} @@ -444,6 +444,28 @@ namespace replset { return true; } + // check for oplog version change + BSONElement elemVersion = op["v"]; + int curVersion = 0; + if (elemVersion.eoo()) + // missing version means version 1 + curVersion = 1; + else + curVersion = elemVersion.Int(); + + if (curVersion != oplogVersion) { + // Version changes cause us to end a batch. + // If we are starting a new batch, reset version number + // and continue. + if (ops->empty()) { + oplogVersion = curVersion; + } + else { + // End batch early + return true; + } + } + // Copy the op to the deque and remove it from the bgsync queue. ops->push_back(op); _networkQueue->consume(); diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h index 47f9c6886fc..da0a66069d9 100644 --- a/src/mongo/db/repl/rs_sync.h +++ b/src/mongo/db/repl/rs_sync.h @@ -105,6 +105,9 @@ namespace replset { // Initial Sync and Sync Tail each use a different function. void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc); + // The version of the last op to be read + int oplogVersion; + private: BackgroundSyncInterface* _networkQueue; diff --git a/src/mongo/dbtests/replsettests.cpp b/src/mongo/dbtests/replsettests.cpp index 74343af2b07..4711e098f20 100644 --- a/src/mongo/dbtests/replsettests.cpp +++ b/src/mongo/dbtests/replsettests.cpp @@ -407,7 +407,8 @@ namespace ReplSetTests { class TestRSSync : public Base { - void addOp(const string& op, BSONObj o, BSONObj* o2 = 0, const char* coll = 0) { + void addOp(const string& op, BSONObj o, BSONObj* o2 = NULL, const char* coll = NULL, + int version = 0) { OpTime ts; { Lock::GlobalWrite lk; @@ -416,6 +417,9 @@ namespace ReplSetTests { BSONObjBuilder b; b.appendTimestamp("ts", ts.asLL()); + if (version != 0) { + b.append("v", version); + } b.append("op", op); b.append("o", o); @@ -439,6 +443,12 @@ namespace ReplSetTests { } } + void addVersionedInserts(int expected) { + for (int i=0; i < expected; i++) { + addOp("i", BSON("_id" << i << "x" << 789), NULL, NULL, i); + } + } + void addUpdates() { BSONObj id = BSON("_id" << "123456something"); addOp("i", id); @@ -475,6 +485,12 @@ namespace ReplSetTests { ASSERT_EQUALS(expected, static_cast<int>(client()->count(ns()))); drop(); + addVersionedInserts(100); + applyOplog(); + + ASSERT_EQUALS(expected, static_cast<int>(client()->count(ns()))); + + drop(); addUpdates(); applyOplog(); |