summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2012-10-04 18:04:31 -0400
committerEric Milkie <milkie@10gen.com>2012-10-05 09:49:57 -0400
commitedd2fa6fea3f52e1e9ac06f26a7f60d5758adae9 (patch)
treef3c007a76fc8a0644aeea83808773ff436c73513
parent8e660532455d1165ff039068336c3dcb1cd5ad5f (diff)
downloadmongo-edd2fa6fea3f52e1e9ac06f26a7f60d5758adae9.tar.gz
SERVER-6671 end batch early if oplog version change is detected
-rw-r--r--src/mongo/db/repl/rs_sync.cpp24
-rw-r--r--src/mongo/db/repl/rs_sync.h3
-rw-r--r--src/mongo/dbtests/replsettests.cpp18
3 files changed, 43 insertions, 2 deletions
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 0b12cac7af2..90d3a252754 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -39,7 +39,7 @@ namespace mongo {
namespace replset {
SyncTail::SyncTail(BackgroundSyncInterface *q) :
- Sync(""), _networkQueue(q)
+ Sync(""), oplogVersion(0), _networkQueue(q)
{}
SyncTail::~SyncTail() {}
@@ -444,6 +444,28 @@ namespace replset {
return true;
}
+ // check for oplog version change
+ BSONElement elemVersion = op["v"];
+ int curVersion = 0;
+ if (elemVersion.eoo())
+ // missing version means version 1
+ curVersion = 1;
+ else
+ curVersion = elemVersion.Int();
+
+ if (curVersion != oplogVersion) {
+ // Version changes cause us to end a batch.
+ // If we are starting a new batch, reset version number
+ // and continue.
+ if (ops->empty()) {
+ oplogVersion = curVersion;
+ }
+ else {
+ // End batch early
+ return true;
+ }
+ }
+
// Copy the op to the deque and remove it from the bgsync queue.
ops->push_back(op);
_networkQueue->consume();
diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h
index 47f9c6886fc..da0a66069d9 100644
--- a/src/mongo/db/repl/rs_sync.h
+++ b/src/mongo/db/repl/rs_sync.h
@@ -105,6 +105,9 @@ namespace replset {
// Initial Sync and Sync Tail each use a different function.
void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc);
+ // The version of the last op to be read
+ int oplogVersion;
+
private:
BackgroundSyncInterface* _networkQueue;
diff --git a/src/mongo/dbtests/replsettests.cpp b/src/mongo/dbtests/replsettests.cpp
index 74343af2b07..4711e098f20 100644
--- a/src/mongo/dbtests/replsettests.cpp
+++ b/src/mongo/dbtests/replsettests.cpp
@@ -407,7 +407,8 @@ namespace ReplSetTests {
class TestRSSync : public Base {
- void addOp(const string& op, BSONObj o, BSONObj* o2 = 0, const char* coll = 0) {
+ void addOp(const string& op, BSONObj o, BSONObj* o2 = NULL, const char* coll = NULL,
+ int version = 0) {
OpTime ts;
{
Lock::GlobalWrite lk;
@@ -416,6 +417,9 @@ namespace ReplSetTests {
BSONObjBuilder b;
b.appendTimestamp("ts", ts.asLL());
+ if (version != 0) {
+ b.append("v", version);
+ }
b.append("op", op);
b.append("o", o);
@@ -439,6 +443,12 @@ namespace ReplSetTests {
}
}
+ void addVersionedInserts(int expected) {
+ for (int i=0; i < expected; i++) {
+ addOp("i", BSON("_id" << i << "x" << 789), NULL, NULL, i);
+ }
+ }
+
void addUpdates() {
BSONObj id = BSON("_id" << "123456something");
addOp("i", id);
@@ -475,6 +485,12 @@ namespace ReplSetTests {
ASSERT_EQUALS(expected, static_cast<int>(client()->count(ns())));
drop();
+ addVersionedInserts(100);
+ applyOplog();
+
+ ASSERT_EQUALS(expected, static_cast<int>(client()->count(ns())));
+
+ drop();
addUpdates();
applyOplog();