summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r--src/mongo/db/repl/oplog.cpp56
1 files changed, 30 insertions, 26 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index c32599695ed..eed93e1561a 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -104,7 +104,7 @@ namespace {
// Synchronizes the section where a new Timestamp is generated and when it actually
// appears in the oplog.
mongo::mutex newOpMutex;
- boost::condition newOptimeNotifier;
+ boost::condition newTimestampNotifier;
static std::string _oplogCollectionName;
@@ -125,20 +125,25 @@ namespace {
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
- std::pair<Timestamp, long long> getNextOpTime(OperationContext* txn,
+ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
Collection* oplog,
const char* ns,
ReplicationCoordinator* replCoord,
const char* opstr) {
boost::lock_guard<boost::mutex> lk(newOpMutex);
Timestamp ts = getNextGlobalTimestamp();
- newOptimeNotifier.notify_all();
+ newTimestampNotifier.notify_all();
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
- long long hashNew;
+ long long hashNew = 0;
+ long long term = 0;
+ // Set hash and term if we're in replset mode, otherwise they remain 0 in master/slave.
if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) {
+ // Current term. If we're not a replset of pv=1, it could be the default value (0) or
+ // the last valid term before downgrade.
+ term = ReplClientInfo::forClient(txn->getClient()).getTerm();
hashNew = BackgroundSync::get()->getLastAppliedHash();
@@ -146,7 +151,6 @@ namespace {
if (*opstr == 'n') {
// 'n' operations are always logged
invariant(*ns == '\0');
-
// 'n' operations do not advance the hash, since they are not rolled back
}
else {
@@ -156,13 +160,10 @@ namespace {
BackgroundSync::get()->setLastAppliedHash(hashNew);
}
}
- else {
- hashNew = 0;
- }
- // TODO(siyuan) Use current term
- replCoord->setMyLastOptime(OpTime(ts, 0));
- return std::pair<Timestamp,long long>(ts, hashNew);
+ OpTime opTime(ts, term);
+ replCoord->setMyLastOptime(opTime);
+ return std::pair<OpTime,long long>(opTime, hashNew);
}
/**
@@ -284,7 +285,7 @@ namespace {
_localOplogCollection);
}
- std::pair<Timestamp, long long> slot = getNextOpTime(txn,
+ std::pair<OpTime, long long> slot = getNextOpTime(txn,
_localOplogCollection,
ns,
replCoord,
@@ -295,7 +296,8 @@ namespace {
*/
BSONObjBuilder b(256);
- b.append("ts", slot.first);
+ b.append("ts", slot.first.getTimestamp());
+ b.append("t", slot.first.getTerm());
b.append("h", slot.second);
b.append("v", OPLOG_VERSION);
b.append("op", opstr);
@@ -312,8 +314,7 @@ namespace {
OplogDocWriter writer( partial, obj );
checkOplogInsert( _localOplogCollection->insertDocument( txn, &writer, false ) );
- // TODO(siyuan) set term when logging ops.
- ReplClientInfo::forClient(txn->getClient()).setLastOp( OpTime(slot.first, 0) );
+ ReplClientInfo::forClient(txn->getClient()).setLastOp( slot.first );
}
OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
@@ -344,9 +345,7 @@ namespace {
it != ops.end();
++it) {
const BSONObj& op = *it;
- const Timestamp ts = op["ts"].timestamp();
- // TODO(siyuan) Parse "term" and fill this out
- const OpTime optime = OpTime(ts, 0);
+ const OpTime optime = extractOpTime(op);
checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false));
@@ -396,7 +395,7 @@ namespace {
}
if ( !rs )
- initOpTimeFromOplog(txn, _oplogCollectionName);
+ initTimestampFromOplog(txn, _oplogCollectionName);
return;
}
@@ -870,19 +869,24 @@ namespace {
boost::unique_lock<boost::mutex> lk(newOpMutex);
while (referenceTime == getLastSetTimestamp()) {
- if (boost::cv_status::timeout == newOptimeNotifier.wait_for(lk, timeout))
+ if (boost::cv_status::timeout == newTimestampNotifier.wait_for(lk, timeout))
return;
}
}
- // TODO(siyuan) Change to OpTime after adding term to oplog.
- void setNewOptime(const Timestamp& newTime) {
+ void setNewTimestamp(const Timestamp& newTime) {
boost::lock_guard<boost::mutex> lk(newOpMutex);
setGlobalTimestamp(newTime);
- newOptimeNotifier.notify_all();
+ newTimestampNotifier.notify_all();
+ }
+
+ OpTime extractOpTime(const BSONObj& op) {
+ const Timestamp ts = op["ts"].timestamp();
+ const long long term = op["t"].numberLong(); // Default to 0 if it's absent
+ return OpTime(ts, term);
}
- void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) {
+ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
DBDirectClient c(txn);
BSONObj lastOp = c.findOne(oplogNS,
Query().sort(reverseNaturalObj),
@@ -890,8 +894,8 @@ namespace {
QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
- LOG(1) << "replSet setting last OpTime";
- setNewOptime(lastOp[ "ts" ].timestamp());
+ LOG(1) << "replSet setting last Timestamp";
+ setNewTimestamp(lastOp[ "ts" ].timestamp());
}
}