summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2015-05-19 15:10:31 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2015-05-26 17:23:02 -0400
commit11237ffbb01dcfc810dccae6029d569afa4473db (patch)
tree9c4ba5fbf2ee32bce958d426b6627b7a0fb102fb /src
parent084e41d202d6757504d2cc338f4c0fe0cfe8babb (diff)
downloadmongo-11237ffbb01dcfc810dccae6029d569afa4473db.tar.gz
SERVER-18216 Add term to oplog.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/bgsync.cpp33
-rw-r--r--src/mongo/db/repl/bgsync.h4
-rw-r--r--src/mongo/db/repl/initial_sync.cpp2
-rw-r--r--src/mongo/db/repl/initial_sync.h2
-rw-r--r--src/mongo/db/repl/minvalid.cpp17
-rw-r--r--src/mongo/db/repl/minvalid.h6
-rw-r--r--src/mongo/db/repl/oplog.cpp56
-rw-r--r--src/mongo/db/repl/oplog.h13
-rw-r--r--src/mongo/db/repl/oplogreader.cpp23
-rw-r--r--src/mongo/db/repl/oplogreader.h5
-rw-r--r--src/mongo/db/repl/repl_client_info.cpp8
-rw-r--r--src/mongo/db/repl/repl_client_info.h13
-rw-r--r--src/mongo/db/repl/replication_coordinator.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp24
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp27
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp12
-rw-r--r--src/mongo/db/repl/rs_rollback.h5
-rw-r--r--src/mongo/db/repl/sync_tail.cpp50
-rw-r--r--src/mongo/db/repl/sync_tail.h19
24 files changed, 208 insertions, 137 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b6e2fae6a97..d7db78aae5c 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -156,6 +156,7 @@ env.Library('repl_coordinator_impl',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'data_replicator',
+ 'repl_coordinator_global',
'repl_coordinator_interface',
'replica_set_messages',
'replication_executor',
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 74d66b7dca0..164cb3c99c7 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -103,8 +103,9 @@ namespace {
}
BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize),
- _lastOpTimeFetched(std::numeric_limits<int>::max(),
- 0),
+ _lastOpTimeFetched(
+ Timestamp(std::numeric_limits<int>::max(), 0),
+ std::numeric_limits<long long>::max()),
_lastAppliedHash(0),
_lastFetchedHash(0),
_pause(true),
@@ -227,7 +228,7 @@ namespace {
// find a target to sync from the last optime fetched
- Timestamp lastOpTimeFetched;
+ OpTime lastOpTimeFetched;
{
boost::unique_lock<boost::mutex> lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
@@ -250,7 +251,7 @@ namespace {
_replCoord->signalUpstreamUpdater();
}
- _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched);
+ _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched.getTimestamp());
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
@@ -353,8 +354,8 @@ namespace {
{
boost::unique_lock<boost::mutex> lock(_mutex);
_lastFetchedHash = o["h"].numberLong();
- _lastOpTimeFetched = o["ts"].timestamp();
- LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched.toStringPretty();
+ _lastOpTimeFetched = extractOpTime(o);
+ LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched;
}
}
}
@@ -401,10 +402,10 @@ namespace {
sleepsecs(2);
return true;
}
- Timestamp theirTS = theirLastOp["ts"].timestamp();
- if (theirTS < _lastOpTimeFetched) {
+ OpTime theirOpTime = extractOpTime(theirLastOp);
+ if (theirOpTime < _lastOpTimeFetched) {
log() << "we are ahead of the sync source, will try to roll back";
- syncRollback(txn, _replCoord->getMyLastOptime().getTimestamp(), &r, _replCoord);
+ syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord);
return true;
}
/* we're not ahead? maybe our new query got fresher data. best to come back and try again */
@@ -419,12 +420,12 @@ namespace {
}
BSONObj o = r.nextSafe();
- Timestamp ts = o["ts"].timestamp();
+ OpTime opTime = extractOpTime(o);
long long hash = o["h"].numberLong();
- if( ts != _lastOpTimeFetched || hash != _lastFetchedHash ) {
- log() << "our last op time fetched: " << _lastOpTimeFetched.toStringPretty();
- log() << "source's GTE: " << ts.toStringPretty();
- syncRollback(txn, _replCoord->getMyLastOptime().getTimestamp(), &r, _replCoord);
+ if ( opTime != _lastOpTimeFetched || hash != _lastFetchedHash ) {
+ log() << "our last op time fetched: " << _lastOpTimeFetched;
+ log() << "source's GTE: " << opTime;
+ syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord);
return true;
}
@@ -446,7 +447,7 @@ namespace {
_pause = true;
_syncSourceHost = HostAndPort();
- _lastOpTimeFetched = Timestamp(0,0);
+ _lastOpTimeFetched = OpTime();
_lastFetchedHash = 0;
_appliedBufferCondition.notify_all();
_pausedCondition.notify_all();
@@ -461,7 +462,7 @@ namespace {
// reset _last fields with current oplog data
_lastAppliedHash = updatedLastAppliedHash;
- _lastOpTimeFetched = _replCoord->getMyLastOptime().getTimestamp();
+ _lastOpTimeFetched = _replCoord->getMyLastOptime();
_lastFetchedHash = _lastAppliedHash;
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched <<
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 41654d55469..bc03f755f1f 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -32,6 +32,7 @@
#include "mongo/util/queue.h"
#include "mongo/db/repl/oplogreader.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/jsobj.h"
namespace mongo {
@@ -140,8 +141,7 @@ namespace repl {
// _mutex protects all of the class variables except _syncSourceReader and _buffer
mutable boost::mutex _mutex;
- // TODO(siyuan) Change to OpTime after adding term to oplogs.
- Timestamp _lastOpTimeFetched;
+ OpTime _lastOpTimeFetched;
// lastAppliedHash is used to generate a new hash for the following op, when primary.
long long _lastAppliedHash;
diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp
index 2af2810db29..494094862ba 100644
--- a/src/mongo/db/repl/initial_sync.cpp
+++ b/src/mongo/db/repl/initial_sync.cpp
@@ -48,7 +48,7 @@ namespace repl {
/* initial oplog application, during initial sync, after cloning.
*/
- void InitialSync::oplogApplication(OperationContext* txn, const Timestamp& endOpTime) {
+ void InitialSync::oplogApplication(OperationContext* txn, const OpTime& endOpTime) {
if (replSetForceInitialSyncFailure > 0) {
log() << "test code invoked, forced InitialSync failure: "
<< replSetForceInitialSyncFailure;
diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h
index 49bcd182f95..41c1310656c 100644
--- a/src/mongo/db/repl/initial_sync.h
+++ b/src/mongo/db/repl/initial_sync.h
@@ -46,7 +46,7 @@ namespace repl {
/**
* applies up to endOpTime, fetching missing documents as needed.
*/
- void oplogApplication(OperationContext* txn, const Timestamp& endOpTime);
+ void oplogApplication(OperationContext* txn, const OpTime& endOpTime);
// Initial sync will ignore all journal requirement flags and doesn't wait until
// operations are durable before updating the last OpTime.
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp
index 4ccfbb7eb7e..5cd3acd64e7 100644
--- a/src/mongo/db/repl/minvalid.cpp
+++ b/src/mongo/db/repl/minvalid.cpp
@@ -32,13 +32,13 @@
#include "mongo/db/repl/minvalid.h"
-#include "mongo/bson/timestamp.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -70,12 +70,15 @@ namespace {
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS);
}
- // TODO(siyuan) Change minValid to OpTime
- void setMinValid(OperationContext* ctx, Timestamp ts) {
+ void setMinValid(OperationContext* ctx, const OpTime& opTime) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(ctx, MODE_IX);
Lock::DBLock dblk(ctx->lockState(), "local", MODE_X);
- Helpers::putSingleton(ctx, minvalidNS, BSON("$set" << BSON("ts" << ts)));
+ Helpers::putSingleton(ctx,
+ minvalidNS,
+ BSON("$set" << BSON("ts" << opTime.getTimestamp() <<
+ "t" << opTime.getTerm())));
+
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS);
}
@@ -98,7 +101,7 @@ namespace {
MONGO_UNREACHABLE;
}
- Timestamp getMinValid(OperationContext* txn) {
+ OpTime getMinValid(OperationContext* txn) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IS);
Lock::DBLock dblk(txn->lockState(), "local", MODE_IS);
@@ -106,9 +109,9 @@ namespace {
BSONObj mv;
bool found = Helpers::getSingleton(txn, minvalidNS, mv);
if (found) {
- return mv["ts"].timestamp();
+ return extractOpTime(mv);
}
- return Timestamp();
+ return OpTime();
} MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS);
}
diff --git a/src/mongo/db/repl/minvalid.h b/src/mongo/db/repl/minvalid.h
index 853c96b6c27..2118809c424 100644
--- a/src/mongo/db/repl/minvalid.h
+++ b/src/mongo/db/repl/minvalid.h
@@ -31,9 +31,9 @@
namespace mongo {
class BSONObj;
class OperationContext;
- class Timestamp;
namespace repl {
+ class OpTime;
/**
* Helper functions for maintaining local.replset.minvalid collection contents.
@@ -61,7 +61,7 @@ namespace repl {
* consider the dataset consistent. Do not allow client reads if our last applied operation is
* before the minValid time.
*/
- void setMinValid(OperationContext* ctx, Timestamp ts);
- Timestamp getMinValid(OperationContext* txn);
+ void setMinValid(OperationContext* ctx, const OpTime& opTime);
+ OpTime getMinValid(OperationContext* txn);
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index c32599695ed..eed93e1561a 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -104,7 +104,7 @@ namespace {
// Synchronizes the section where a new Timestamp is generated and when it actually
// appears in the oplog.
mongo::mutex newOpMutex;
- boost::condition newOptimeNotifier;
+ boost::condition newTimestampNotifier;
static std::string _oplogCollectionName;
@@ -125,20 +125,25 @@ namespace {
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
- std::pair<Timestamp, long long> getNextOpTime(OperationContext* txn,
+ std::pair<OpTime, long long> getNextOpTime(OperationContext* txn,
Collection* oplog,
const char* ns,
ReplicationCoordinator* replCoord,
const char* opstr) {
boost::lock_guard<boost::mutex> lk(newOpMutex);
Timestamp ts = getNextGlobalTimestamp();
- newOptimeNotifier.notify_all();
+ newTimestampNotifier.notify_all();
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
- long long hashNew;
+ long long hashNew = 0;
+ long long term = 0;
+ // Set hash and term if we're in replset mode, otherwise they remain 0 in master/slave.
if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) {
+ // Current term. If we're not a replset of pv=1, it could be the default value (0) or
+ // the last valid term before downgrade.
+ term = ReplClientInfo::forClient(txn->getClient()).getTerm();
hashNew = BackgroundSync::get()->getLastAppliedHash();
@@ -146,7 +151,6 @@ namespace {
if (*opstr == 'n') {
// 'n' operations are always logged
invariant(*ns == '\0');
-
// 'n' operations do not advance the hash, since they are not rolled back
}
else {
@@ -156,13 +160,10 @@ namespace {
BackgroundSync::get()->setLastAppliedHash(hashNew);
}
}
- else {
- hashNew = 0;
- }
- // TODO(siyuan) Use current term
- replCoord->setMyLastOptime(OpTime(ts, 0));
- return std::pair<Timestamp,long long>(ts, hashNew);
+ OpTime opTime(ts, term);
+ replCoord->setMyLastOptime(opTime);
+ return std::pair<OpTime,long long>(opTime, hashNew);
}
/**
@@ -284,7 +285,7 @@ namespace {
_localOplogCollection);
}
- std::pair<Timestamp, long long> slot = getNextOpTime(txn,
+ std::pair<OpTime, long long> slot = getNextOpTime(txn,
_localOplogCollection,
ns,
replCoord,
@@ -295,7 +296,8 @@ namespace {
*/
BSONObjBuilder b(256);
- b.append("ts", slot.first);
+ b.append("ts", slot.first.getTimestamp());
+ b.append("t", slot.first.getTerm());
b.append("h", slot.second);
b.append("v", OPLOG_VERSION);
b.append("op", opstr);
@@ -312,8 +314,7 @@ namespace {
OplogDocWriter writer( partial, obj );
checkOplogInsert( _localOplogCollection->insertDocument( txn, &writer, false ) );
- // TODO(siyuan) set term when logging ops.
- ReplClientInfo::forClient(txn->getClient()).setLastOp( OpTime(slot.first, 0) );
+ ReplClientInfo::forClient(txn->getClient()).setLastOp( slot.first );
}
OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
@@ -344,9 +345,7 @@ namespace {
it != ops.end();
++it) {
const BSONObj& op = *it;
- const Timestamp ts = op["ts"].timestamp();
- // TODO(siyuan) Parse "term" and fill this out
- const OpTime optime = OpTime(ts, 0);
+ const OpTime optime = extractOpTime(op);
checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false));
@@ -396,7 +395,7 @@ namespace {
}
if ( !rs )
- initOpTimeFromOplog(txn, _oplogCollectionName);
+ initTimestampFromOplog(txn, _oplogCollectionName);
return;
}
@@ -870,19 +869,24 @@ namespace {
boost::unique_lock<boost::mutex> lk(newOpMutex);
while (referenceTime == getLastSetTimestamp()) {
- if (boost::cv_status::timeout == newOptimeNotifier.wait_for(lk, timeout))
+ if (boost::cv_status::timeout == newTimestampNotifier.wait_for(lk, timeout))
return;
}
}
- // TODO(siyuan) Change to OpTime after adding term to oplog.
- void setNewOptime(const Timestamp& newTime) {
+ void setNewTimestamp(const Timestamp& newTime) {
boost::lock_guard<boost::mutex> lk(newOpMutex);
setGlobalTimestamp(newTime);
- newOptimeNotifier.notify_all();
+ newTimestampNotifier.notify_all();
+ }
+
+ OpTime extractOpTime(const BSONObj& op) {
+ const Timestamp ts = op["ts"].timestamp();
+ const long long term = op["t"].numberLong(); // Default to 0 if it's absent
+ return OpTime(ts, term);
}
- void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) {
+ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
DBDirectClient c(txn);
BSONObj lastOp = c.findOne(oplogNS,
Query().sort(reverseNaturalObj),
@@ -890,8 +894,8 @@ namespace {
QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
- LOG(1) << "replSet setting last OpTime";
- setNewOptime(lastOp[ "ts" ].timestamp());
+ LOG(1) << "replSet setting last Timestamp";
+ setNewTimestamp(lastOp[ "ts" ].timestamp());
}
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index a7565a6cdf2..4d25a94109d 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -116,14 +116,19 @@ namespace repl {
void waitForTimestampChange(const Timestamp& referenceTime, Microseconds timeout);
/**
- * Initializes the global OpTime with the value from the timestamp of the last oplog entry.
+ * Initializes the global Timestamp with the value from the timestamp of the last oplog entry.
*/
- void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS);
+ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS);
/**
- * Sets the global OpTime to be 'newTime'.
+ * Sets the global Timestamp to be 'newTime'.
*/
- void setNewOptime(const Timestamp& newTime);
+ void setNewTimestamp(const Timestamp& newTime);
+
+ /*
+ * Extract the OpTime from log entry.
+ */
+ OpTime extractOpTime(const BSONObj& op);
/**
* Detects the current replication mode and sets the "_oplogCollectionName" accordingly.
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index f8e72373253..4809a96546b 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -142,10 +142,11 @@ namespace repl {
}
void OplogReader::connectToSyncSource(OperationContext* txn,
- Timestamp lastOpTimeFetched,
+ const OpTime& lastOpTimeFetched,
ReplicationCoordinator* replCoord) {
- const Timestamp sentinel(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
- Timestamp oldestOpTimeSeen = sentinel;
+ const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
+ const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max());
+ OpTime oldestOpTimeSeen = sentinel;
invariant(conn() == NULL);
@@ -164,9 +165,9 @@ namespace repl {
// Connected to at least one member, but in all cases we were too stale to use them
// as a sync source.
- error() << "RS102 too stale to catch up";
- log() << "our last optime : " << lastOpTimeFetched.toStringLong();
- log() << "oldest available is " << oldestOpTimeSeen.toStringLong();
+ error() << "too stale to catch up";
+ log() << "our last optime : " << lastOpTimeFetched;
+ log() << "oldest available is " << oldestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
setMinValid(txn, oldestOpTimeSeen);
bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING);
@@ -188,15 +189,7 @@ namespace repl {
// Read the first (oldest) op and confirm that it's not newer than our last
// fetched op. Otherwise, we have fallen off the back of that source's oplog.
BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query()));
- BSONElement tsElem(remoteOldestOp["ts"]);
- if (tsElem.type() != bsonTimestamp) {
- // This member's got a bad op in its oplog.
- warning() << "oplog invalid format on node " << candidate.toString();
- resetConnection();
- replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10));
- continue;
- }
- Timestamp remoteOldOpTime = tsElem.timestamp();
+ OpTime remoteOldOpTime = extractOpTime(remoteOldestOp);
if (lastOpTimeFetched < remoteOldOpTime) {
// We're too stale to use this sync source.
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 017e2613281..66ac8f9c6c0 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -44,6 +44,7 @@ namespace mongo {
namespace repl {
class ReplicationCoordinator;
+ class OpTime;
// {"$natural": -1 }
extern const BSONObj reverseNaturalObj;
@@ -145,8 +146,8 @@ namespace repl {
* sync source blacklist.
* This function may throw DB exceptions.
*/
- void connectToSyncSource(OperationContext* txn,
- Timestamp lastOpTimeFetched,
+ void connectToSyncSource(OperationContext* txn,
+ const OpTime& lastOpTimeFetched,
ReplicationCoordinator* replCoord);
};
diff --git a/src/mongo/db/repl/repl_client_info.cpp b/src/mongo/db/repl/repl_client_info.cpp
index 2f8a1e72470..631c121c223 100644
--- a/src/mongo/db/repl/repl_client_info.cpp
+++ b/src/mongo/db/repl/repl_client_info.cpp
@@ -33,6 +33,7 @@
#include "mongo/base/init.h"
#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/util/decorable.h"
namespace mongo {
@@ -41,5 +42,12 @@ namespace repl {
const Client::Decoration<ReplClientInfo> ReplClientInfo::forClient =
Client::declareDecoration<ReplClientInfo>();
+ long long ReplClientInfo::getTerm() {
+ if (_cachedTerm == kUninitializedTerm) {
+ _cachedTerm = getGlobalReplicationCoordinator()->getTerm();
+ }
+ return _cachedTerm;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_client_info.h b/src/mongo/db/repl/repl_client_info.h
index 640e52dd7e6..69694f2795d 100644
--- a/src/mongo/db/repl/repl_client_info.h
+++ b/src/mongo/db/repl/repl_client_info.h
@@ -50,9 +50,22 @@ namespace repl {
void setRemoteID(OID rid) { _remoteId = rid; }
OID getRemoteID() const { return _remoteId; }
+ // If we haven't cached a term from replication coordinator, get the current term
+ // and cache it during the life cycle of this client.
+ //
+ // Used by logOp() to attach the current term to each log entries. Assume we don't change
+ // the term since caching it. This is true for write commands, since we acquire the
+ // global lock (IX) for write commands and stepping down also needs that lock (S).
+ // Stepping down will kill all user operations, so there is no write after stepping down
+ // in the case of yielding.
+ long long getTerm();
+
private:
+ static const long long kUninitializedTerm = -1;
+
OpTime _lastOp = OpTime();
OID _remoteId = OID();
+ long long _cachedTerm = kUninitializedTerm;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 3a1dbb1bbc3..903e13fc758 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -620,6 +620,11 @@ namespace repl {
*/
virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0;
+ /**
+ * Return the current term.
+ */
+ virtual long long getTerm() = 0;
+
protected:
ReplicationCoordinator();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index c5ba2565810..0fbd1b9424d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -248,7 +248,7 @@ namespace {
}
void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) {
- setNewOptime(newTime);
+ setNewTimestamp(newTime);
}
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
@@ -276,8 +276,7 @@ namespace {
"\" in most recent " << rsOplogName <<
" entry to have type Timestamp, but found " << typeName(tsElement.type()));
}
- // TODO(siyuan) add term
- return StatusWith<OpTime>(OpTime(tsElement.timestamp(), 0));
+ return StatusWith<OpTime>(extractOpTime(oplogEntry));
}
catch (const DBException& ex) {
return StatusWith<OpTime>(ex.toStatus());
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 6069630e6d9..ec3d3391d2a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2711,5 +2711,29 @@ namespace {
_topCoord->summarizeAsHtml(output);
}
+ long long ReplicationCoordinatorImpl::getTerm() {
+ long long term = OpTime::kDefaultTerm;
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_getTerm_helper,
+ this,
+ stdx::placeholders::_1,
+ &term));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return term;
+ }
+ fassert(28660, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ return term;
+ }
+
+ void ReplicationCoordinatorImpl::_getTerm_helper(
+ const ReplicationExecutor::CallbackData& cbData,
+ long long* term) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ *term = _topCoord->getTerm();
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index c03d0e94b16..4c4539eabe8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -264,6 +264,11 @@ namespace repl {
virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override;
+ /**
+ * Get current term from topology coordinator
+ */
+ long long getTerm() override;
+
// ================== Test support API ===================
/**
@@ -852,6 +857,12 @@ namespace repl {
void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackData& cbData,
ReplSetHtmlSummary* output);
+ /**
+ * Callback that gets the current term from topology coordinator.
+ */
+ void _getTerm_helper(const ReplicationExecutor::CallbackData& cbData, long long* term);
+
+
//
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 1f00a0ee521..4ca92dca19d 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -334,5 +334,7 @@ namespace repl {
void ReplicationCoordinatorMock::summarizeAsHtml(ReplSetHtmlSummary* output) {}
+ long long ReplicationCoordinatorMock::getTerm() { return OpTime::kDefaultTerm; }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 7cf584b18f4..7ac09e27c2d 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -203,6 +203,8 @@ namespace repl {
virtual void summarizeAsHtml(ReplSetHtmlSummary* output);
+ virtual long long getTerm();
+
private:
const ReplSettings _settings;
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index d8ceabaa98a..04e82151370 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -80,7 +80,7 @@ namespace {
ReplicationCoordinator* replCoord,
BackgroundSync* bgsync) {
// Clear minvalid
- setMinValid(txn, Timestamp());
+ setMinValid(txn, OpTime());
AutoGetDb autoDb(txn, "local", MODE_X);
massert(28585, "no local database found", autoDb.getDb());
@@ -219,16 +219,17 @@ namespace {
bool _initialSyncApplyOplog( OperationContext* ctx,
repl::SyncTail& syncer,
OplogReader* r) {
- // TODO(siyuan) Change to OpTime after adding term to op logs.
- const Timestamp startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime()
- .getTimestamp();
+ const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime();
BSONObj lastOp;
// If the fail point is set, exit failing.
if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) {
log() << "adding fake oplog entry to buffer.";
BackgroundSync::get()->pushTestOpToBuffer(
- BSON("ts" << startOpTime << "v" << 1 << "op" << "n"));
+ BSON("ts" << startOpTime.getTimestamp() <<
+ "t" << startOpTime.getTerm() <<
+ "v" << 1 <<
+ "op" << "n"));
return false;
}
@@ -257,7 +258,7 @@ namespace {
return false;
}
- Timestamp stopOpTime = lastOp["ts"].timestamp();
+ OpTime stopOpTime = extractOpTime(lastOp);
// If we already have what we need then return.
if (stopOpTime == startOpTime)
@@ -268,8 +269,7 @@ namespace {
// apply till stopOpTime
try {
- LOG(2) << "Applying oplog entries from " << startOpTime.toStringPretty()
- << " until " << stopOpTime.toStringPretty();
+ LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime;
syncer.oplogApplication(ctx, stopOpTime);
if (inShutdown()) {
@@ -356,11 +356,12 @@ namespace {
OplogReader r;
Timestamp now(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
+ OpTime nowOpTime(now, std::numeric_limits<long long>::max());
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
- // of oplog position, by passing in "now" as the last op fetched time.
- r.connectToSyncSource(&txn, now, replCoord);
+ // of oplog position, by passing in "now" with max term as the last op fetched time.
+ r.connectToSyncSource(&txn, nowOpTime, replCoord);
if (r.getHost().empty()) {
std::string msg =
"no valid sync sources found in current replset to do an initial sync";
@@ -416,7 +417,7 @@ namespace {
OpTime lastOptime = writeOpsToOplog(&txn, ops);
ReplClientInfo::forClient(txn.getClient()).setLastOp(lastOptime);
replCoord->setMyLastOptime(lastOptime);
- setNewOptime(lastOptime.getTimestamp());
+ setNewTimestamp(lastOptime.getTimestamp());
std::string msg = "oplog sync 1 of 3";
log() << msg;
@@ -465,9 +466,7 @@ namespace {
{
ScopedTransaction scopedXact(&txn, MODE_IX);
AutoGetDb autodb(&txn, "local", MODE_X);
- // TODO(siyuan) Change to OpTime after adding term to op logs.
- Timestamp lastOpTimeWritten(
- getGlobalReplicationCoordinator()->getMyLastOptime().getTimestamp());
+ OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime());
log() << "set minValid=" << lastOpTimeWritten;
// Initial sync is now complete. Flag this by setting minValid to the last thing
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 0f6c308c96c..3ceda67ffec 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -467,8 +467,8 @@ namespace {
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
- Timestamp minValid = newMinValid["ts"].timestamp();
- log() << "minvalid=" << minValid.toStringLong();
+ OpTime minValid = extractOpTime(newMinValid);
+ log() << "minvalid=" << minValid;
setMinValid(txn, minValid);
// any full collection resyncs required?
@@ -572,8 +572,8 @@ namespace {
err = "can't get minvalid from sync source";
}
else {
- Timestamp minValid = newMinValid["ts"].timestamp();
- log() << "minvalid=" << minValid.toStringLong();
+ OpTime minValid = extractOpTime(newMinValid);
+ log() << "minvalid=" << minValid;
setMinValid(txn, minValid);
}
}
@@ -932,13 +932,13 @@ namespace {
} // namespace
void syncRollback(OperationContext* txn,
- Timestamp lastOpTimeApplied,
+ const OpTime& lastOpTimeApplied,
OplogReader* oplogreader,
ReplicationCoordinator* replCoord) {
// check that we are at minvalid, otherwise we cannot rollback as we may be in an
// inconsistent state
{
- Timestamp minvalid = getMinValid(txn);
+ OpTime minvalid = getMinValid(txn);
if( minvalid > lastOpTimeApplied ) {
severe() << "need to rollback, but in inconsistent state" << endl;
log() << "minvalid: " << minvalid.toString() << " our last optime: "
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index 55f1b887d92..bfe2c0c5621 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -33,7 +33,8 @@ namespace mongo {
class Timestamp;
namespace repl {
- class OplogReader;
+ class OplogReader;
+ class OpTime;
class ReplicationCoordinator;
/**
@@ -59,7 +60,7 @@ namespace repl {
*/
void syncRollback(OperationContext* txn,
- Timestamp lastOpTimeWritten,
+ const OpTime& lastOpTimeWritten,
OplogReader* oplogreader,
ReplicationCoordinator* replCoord);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 0c87acbf516..0e690b80c30 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -341,13 +341,13 @@ namespace {
// Doles out all the work to the writer pool threads and waits for them to complete
// static
- Timestamp SyncTail::multiApply(OperationContext* txn,
- const OpQueue& ops,
- threadpool::ThreadPool* prefetcherPool,
- threadpool::ThreadPool* writerPool,
- MultiSyncApplyFunc func,
- SyncTail* sync,
- bool supportsWaitingUntilDurable) {
+ OpTime SyncTail::multiApply(OperationContext* txn,
+ const OpQueue& ops,
+ threadpool::ThreadPool* prefetcherPool,
+ threadpool::ThreadPool* writerPool,
+ MultiSyncApplyFunc func,
+ SyncTail* sync,
+ bool supportsWaitingUntilDurable) {
invariant(prefetcherPool);
invariant(writerPool);
invariant(func);
@@ -381,7 +381,7 @@ namespace {
applyOps(writerVectors, writerPool, func, sync);
if (inShutdown()) {
- return Timestamp();
+ return OpTime();
}
const bool mustWaitUntilDurable = replCoord->isV1ElectionProtocol() &&
@@ -397,19 +397,19 @@ namespace {
}
ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOpTime);
replCoord->setMyLastOptime(lastOpTime);
- setNewOptime(lastOpTime.getTimestamp());
+ setNewTimestamp(lastOpTime.getTimestamp());
BackgroundSync::get()->notify(txn);
- return lastOpTime.getTimestamp();
+ return lastOpTime;
}
- void SyncTail::oplogApplication(OperationContext* txn, const Timestamp& endOpTime) {
+ void SyncTail::oplogApplication(OperationContext* txn, const OpTime& endOpTime) {
_applyOplogUntil(txn, endOpTime);
}
/* applies oplog from "now" until endOpTime using the applier threads for initial sync*/
- void SyncTail::_applyOplogUntil(OperationContext* txn, const Timestamp& endOpTime) {
+ void SyncTail::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTime) {
unsigned long long bytesApplied = 0;
unsigned long long entriesApplied = 0;
while (true) {
@@ -421,7 +421,7 @@ namespace {
// Check if we reached the end
const BSONObj currentOp = ops.back();
- const Timestamp currentOpTime = currentOp["ts"].timestamp();
+ const OpTime currentOpTime = extractOpTime(currentOp);
// When we reach the end return this batch
if (currentOpTime == endOpTime) {
@@ -451,14 +451,13 @@ namespace {
bytesApplied += ops.getSize();
entriesApplied += ops.getDeque().size();
- const Timestamp lastOpTime = multiApply(txn,
- ops,
- &_prefetcherPool,
- &_writerPool,
- _applyFunc,
- this,
- supportsWaitingUntilDurable());
-
+ const OpTime lastOpTime = multiApply(txn,
+ ops,
+ &_prefetcherPool,
+ &_writerPool,
+ _applyFunc,
+ this,
+ supportsWaitingUntilDurable());
if (inShutdown()) {
return;
}
@@ -467,7 +466,7 @@ namespace {
if (lastOpTime == endOpTime) {
LOG(1) << "SyncTail applied " << entriesApplied
<< " entries (" << bytesApplied << " bytes)"
- << " and finished at opTime " << endOpTime.toStringPretty();
+ << " and finished at opTime " << endOpTime;
return;
}
} // end of while (true)
@@ -493,8 +492,8 @@ namespace {
return;
}
- Timestamp minvalid = getMinValid(txn);
- if (minvalid > replCoord->getMyLastOptime().getTimestamp()) {
+ OpTime minvalid = getMinValid(txn);
+ if (minvalid > replCoord->getMyLastOptime()) {
return;
}
@@ -578,8 +577,7 @@ namespace {
// Set minValid to the last op to be applied in this next batch.
// This will cause this node to go into RECOVERING state
// if we should crash and restart before updating the oplog
- Timestamp minValid = lastOp["ts"].timestamp();
- setMinValid(&txn, minValid);
+ setMinValid(&txn, extractOpTime(lastOp));
multiApply(&txn,
ops,
&_prefetcherPool,
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 20c59da5050..6b549c4f3d8 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -44,6 +44,7 @@ namespace mongo {
namespace repl {
class BackgroundSyncInterface;
class ReplicationCoordinator;
+ class OpTime;
/**
* "Normal" replica set syncing
@@ -97,7 +98,7 @@ namespace repl {
/**
* Runs _applyOplogUntil(stopOpTime)
*/
- virtual void oplogApplication(OperationContext* txn, const Timestamp& stopOpTime);
+ virtual void oplogApplication(OperationContext* txn, const OpTime& stopOpTime);
void oplogApplication();
bool peek(BSONObj* obj);
@@ -156,20 +157,20 @@ namespace repl {
// Prefetch and write a deque of operations, using the supplied function.
// Initial Sync and Sync Tail each use a different function.
// Returns the last OpTime applied.
- static Timestamp multiApply(OperationContext* txn,
- const OpQueue& ops,
- threadpool::ThreadPool* prefetcherPool,
- threadpool::ThreadPool* writerPool,
- MultiSyncApplyFunc func,
- SyncTail* sync,
- bool supportsAwaitingCommit);
+ static OpTime multiApply(OperationContext* txn,
+ const OpQueue& ops,
+ threadpool::ThreadPool* prefetcherPool,
+ threadpool::ThreadPool* writerPool,
+ MultiSyncApplyFunc func,
+ SyncTail* sync,
+ bool supportsAwaitingCommit);
/**
* Applies oplog entries until reaching "endOpTime".
*
* NOTE:Will not transition or check states
*/
- void _applyOplogUntil(OperationContext* txn, const Timestamp& endOpTime);
+ void _applyOplogUntil(OperationContext* txn, const OpTime& endOpTime);
private:
std::string _hostname;