summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/rs_initialsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/rs_initialsync.cpp')
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp27
1 files changed, 13 insertions, 14 deletions
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index d8ceabaa98a..04e82151370 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -80,7 +80,7 @@ namespace {
ReplicationCoordinator* replCoord,
BackgroundSync* bgsync) {
// Clear minvalid
- setMinValid(txn, Timestamp());
+ setMinValid(txn, OpTime());
AutoGetDb autoDb(txn, "local", MODE_X);
massert(28585, "no local database found", autoDb.getDb());
@@ -219,16 +219,17 @@ namespace {
bool _initialSyncApplyOplog( OperationContext* ctx,
repl::SyncTail& syncer,
OplogReader* r) {
- // TODO(siyuan) Change to OpTime after adding term to op logs.
- const Timestamp startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime()
- .getTimestamp();
+ const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime();
BSONObj lastOp;
// If the fail point is set, exit failing.
if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) {
log() << "adding fake oplog entry to buffer.";
BackgroundSync::get()->pushTestOpToBuffer(
- BSON("ts" << startOpTime << "v" << 1 << "op" << "n"));
+ BSON("ts" << startOpTime.getTimestamp() <<
+ "t" << startOpTime.getTerm() <<
+ "v" << 1 <<
+ "op" << "n"));
return false;
}
@@ -257,7 +258,7 @@ namespace {
return false;
}
- Timestamp stopOpTime = lastOp["ts"].timestamp();
+ OpTime stopOpTime = extractOpTime(lastOp);
// If we already have what we need then return.
if (stopOpTime == startOpTime)
@@ -268,8 +269,7 @@ namespace {
// apply till stopOpTime
try {
- LOG(2) << "Applying oplog entries from " << startOpTime.toStringPretty()
- << " until " << stopOpTime.toStringPretty();
+ LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime;
syncer.oplogApplication(ctx, stopOpTime);
if (inShutdown()) {
@@ -356,11 +356,12 @@ namespace {
OplogReader r;
Timestamp now(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
+ OpTime nowOpTime(now, std::numeric_limits<long long>::max());
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
- // of oplog position, by passing in "now" as the last op fetched time.
- r.connectToSyncSource(&txn, now, replCoord);
+ // of oplog position, by passing in "now" with max term as the last op fetched time.
+ r.connectToSyncSource(&txn, nowOpTime, replCoord);
if (r.getHost().empty()) {
std::string msg =
"no valid sync sources found in current replset to do an initial sync";
@@ -416,7 +417,7 @@ namespace {
OpTime lastOptime = writeOpsToOplog(&txn, ops);
ReplClientInfo::forClient(txn.getClient()).setLastOp(lastOptime);
replCoord->setMyLastOptime(lastOptime);
- setNewOptime(lastOptime.getTimestamp());
+ setNewTimestamp(lastOptime.getTimestamp());
std::string msg = "oplog sync 1 of 3";
log() << msg;
@@ -465,9 +466,7 @@ namespace {
{
ScopedTransaction scopedXact(&txn, MODE_IX);
AutoGetDb autodb(&txn, "local", MODE_X);
- // TODO(siyuan) Change to OpTime after adding term to op logs.
- Timestamp lastOpTimeWritten(
- getGlobalReplicationCoordinator()->getMyLastOptime().getTimestamp());
+ OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime());
log() << "set minValid=" << lastOpTimeWritten;
// Initial sync is now complete. Flag this by setting minValid to the last thing