summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2012-10-03 11:32:40 -0400
committerEric Milkie <milkie@10gen.com>2012-10-04 10:53:02 -0400
commitee081e1df9792e6435568b16d4710a9443ca3657 (patch)
treef45f2290fd2c53dc8c51df7ae427d839ac2615f7
parente0c65e3f6b8c57c3eb873c607a029dcb05b99e5f (diff)
downloadmongo-ee081e1df9792e6435568b16d4710a9443ca3657.tar.gz
SERVER-6671 add a 'v' version field to each oplog document
The version field will allow us to detect the primary's version. We need to know which version because only newer primary oplog streams should prevent a secondary from enforcing unique index constraints in initial sync or recovering states. The version field will also be useful in the future when we want to make schema changes in the oplog.
-rw-r--r--src/mongo/db/oplog.cpp41
-rw-r--r--src/mongo/db/repl/rs.cpp1
-rw-r--r--src/mongo/db/repl/rs.h21
-rw-r--r--src/mongo/db/repl/rs_sync.cpp19
-rw-r--r--src/mongo/db/repl/rs_sync.h1
5 files changed, 53 insertions, 30 deletions
diff --git a/src/mongo/db/oplog.cpp b/src/mongo/db/oplog.cpp
index d7f87074c5b..fc5678badde 100644
--- a/src/mongo/db/oplog.cpp
+++ b/src/mongo/db/oplog.cpp
@@ -124,10 +124,30 @@ namespace mongo {
*b = EOO;
}
+ /* we write to local.oplog.rs:
+ { ts : ..., h: ..., v: ..., op: ..., ns: ..., o: ... }
+ ts: an OpTime timestamp
+ h: hash
+ v: version
+ op:
+ "i" insert
+ "u" update
+ "d" delete
+ "c" db cmd
+ "db" declares presence of a database (ns is set to the db name + '.')
+ "n" no op
+ logNS: always null! DEPRECATED
+ bb:
+ if not null, specifies a boolean to pass along to the other side as b: param.
+ used for "justOne" or "upsert" flags on 'd', 'u'
+
+ */
+
// global is safe as we are in write lock. we put the static outside the function to avoid the implicit mutex
// the compiler would use if inside the function. the reason this is static is to avoid a malloc/free for this
// on every logop call.
static BufBuilder logopbufbuilder(8*1024);
+ const static int OPLOG_VERSION = 2;
static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) {
Lock::DBWrite lk1("local");
@@ -159,6 +179,7 @@ namespace mongo {
BSONObjBuilder b(logopbufbuilder);
b.appendTimestamp("ts", ts.asDate());
b.append("h", hashNew);
+ b.append("v", OPLOG_VERSION);
b.append("op", opstr);
b.append("ns", ns);
if (fromMigrate)
@@ -205,26 +226,6 @@ namespace mongo {
}
}
- /* we write to local.oplog.$main:
- { ts : ..., op: ..., ns: ..., o: ... }
- ts: an OpTime timestamp
- op:
- "i" insert
- "u" update
- "d" delete
- "c" db cmd
- "db" declares presence of a database (ns is set to the db name + '.')
- "n" no op
- logNS: where to log it. 0/null means "local.oplog.$main".
- bb:
- if not null, specifies a boolean to pass along to the other side as b: param.
- used for "justOne" or "upsert" flags on 'd', 'u'
- first: true
- when set, indicates this is the first thing we have logged for this database.
- thus, the slave does not need to copy down all the data when it sees this.
-
- note this is used for single collection logging even when --replSet is enabled.
- */
static void _logOpOld(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) {
Lock::DBWrite lk("local");
static BufBuilder bufbuilder(8*1024); // todo there is likely a mutex on this constructor
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
index a5f567ae014..b574d79f8c6 100644
--- a/src/mongo/db/repl/rs.cpp
+++ b/src/mongo/db/repl/rs.cpp
@@ -423,6 +423,7 @@ namespace mongo {
ghost(0),
_writerPool(replWriterThreadCount),
_prefetcherPool(replPrefetcherThreadCount),
+ oplogVersion(0),
_indexPrefetchConfig(PREFETCH_ALL) {
}
diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h
index 2feef015c60..019fc64965f 100644
--- a/src/mongo/db/repl/rs.h
+++ b/src/mongo/db/repl/rs.h
@@ -540,6 +540,8 @@ namespace mongo {
void syncThread();
const OpTime lastOtherOpTime() const;
static void setMinValid(BSONObj obj);
+
+ int oplogVersion;
private:
IndexPrefetchConfig _indexPrefetchConfig;
};
@@ -676,15 +678,18 @@ namespace mongo {
if (!theReplSet) return false;
// see SERVER-6671
MemberState ms = theReplSet->state();
- if ((ms == MemberState::RS_STARTUP2) ||
- (ms == MemberState::RS_RECOVERING) ||
- (ms == MemberState::RS_ROLLBACK)) {
- // Never ignore _id index
- if (!idx.isIdIndex()) {
- return true;
- }
+ if (! ((ms == MemberState::RS_STARTUP2) ||
+ (ms == MemberState::RS_RECOVERING) ||
+ (ms == MemberState::RS_ROLLBACK))) {
+ return false;
}
- return false;
+ // 2 is the oldest oplog version where operations
+ // are fully idempotent.
+ if (theReplSet->oplogVersion < 2) return false;
+ // Never ignore _id index
+ if (idx.isIdIndex()) return false;
+
+ return true;
}
}
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index bd7d24c6742..0b12cac7af2 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -262,8 +262,8 @@ namespace replset {
break;
}
}
-
-
+ setOplogVersion(ops.getDeque().front());
+
multiApply(ops.getDeque(), func);
n += ops.getDeque().size();
@@ -310,6 +310,19 @@ namespace replset {
return oplogApplySegment(applyGTEObj, minValidObj, multiSyncApply);
}
+ void SyncTail::setOplogVersion(const BSONObj& op) {
+ BSONElement version = op["v"];
+ // old primaries do not get the unique index ignoring feature
+ // because some of their ops are not imdepotent, see
+ // SERVER-7186
+ if (version.eoo()) {
+ theReplSet->oplogVersion = 1;
+ RARELY log() << "warning replset primary is an older version than we are; upgrade recommended" << endl;
+ } else {
+ theReplSet->oplogVersion = version.Int();
+ }
+ }
+
/* tail an oplog. ok to return, will be re-called. */
void SyncTail::oplogApplication() {
while( 1 ) {
@@ -378,6 +391,7 @@ namespace replset {
}
const BSONObj& lastOp = ops.getDeque().back();
+ setOplogVersion(lastOp);
handleSlaveDelay(lastOp);
// Set minValid to the last op to be applied in this next batch.
@@ -414,6 +428,7 @@ namespace replset {
// otherwise, apply what we have
return true;
}
+
// check for commands
if ((op["op"].valuestrsafe()[0] == 'c') ||
// Index builds are acheived through the use of an insert op, not a command op.
diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h
index 1e52380b458..47f9c6886fc 100644
--- a/src/mongo/db/repl/rs_sync.h
+++ b/src/mongo/db/repl/rs_sync.h
@@ -120,6 +120,7 @@ namespace replset {
void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors);
void handleSlaveDelay(const BSONObj& op);
+ void setOplogVersion(const BSONObj& op);
};
/**