diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2015-05-19 15:10:31 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2015-05-26 17:23:02 -0400 |
commit | 11237ffbb01dcfc810dccae6029d569afa4473db (patch) | |
tree | 9c4ba5fbf2ee32bce958d426b6627b7a0fb102fb /src | |
parent | 084e41d202d6757504d2cc338f4c0fe0cfe8babb (diff) | |
download | mongo-11237ffbb01dcfc810dccae6029d569afa4473db.tar.gz |
SERVER-18216 Add term to oplog.
Diffstat (limited to 'src')
24 files changed, 208 insertions, 137 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b6e2fae6a97..d7db78aae5c 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -156,6 +156,7 @@ env.Library('repl_coordinator_impl', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', 'data_replicator', + 'repl_coordinator_global', 'repl_coordinator_interface', 'replica_set_messages', 'replication_executor', diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 74d66b7dca0..164cb3c99c7 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -103,8 +103,9 @@ namespace { } BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), - _lastOpTimeFetched(std::numeric_limits<int>::max(), - 0), + _lastOpTimeFetched( + Timestamp(std::numeric_limits<int>::max(), 0), + std::numeric_limits<long long>::max()), _lastAppliedHash(0), _lastFetchedHash(0), _pause(true), @@ -227,7 +228,7 @@ namespace { // find a target to sync from the last optime fetched - Timestamp lastOpTimeFetched; + OpTime lastOpTimeFetched; { boost::unique_lock<boost::mutex> lock(_mutex); lastOpTimeFetched = _lastOpTimeFetched; @@ -250,7 +251,7 @@ namespace { _replCoord->signalUpstreamUpdater(); } - _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched); + _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched.getTimestamp()); // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor @@ -353,8 +354,8 @@ namespace { { boost::unique_lock<boost::mutex> lock(_mutex); _lastFetchedHash = o["h"].numberLong(); - _lastOpTimeFetched = o["ts"].timestamp(); - LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched.toStringPretty(); + _lastOpTimeFetched = extractOpTime(o); + LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched; } } } @@ -401,10 +402,10 @@ namespace { sleepsecs(2); return true; } - Timestamp theirTS = theirLastOp["ts"].timestamp(); - if (theirTS < _lastOpTimeFetched) { + OpTime theirOpTime = extractOpTime(theirLastOp); + if (theirOpTime < _lastOpTimeFetched) { log() << "we are ahead of the sync source, will try to roll back"; - syncRollback(txn, _replCoord->getMyLastOptime().getTimestamp(), &r, _replCoord); + syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord); return true; } /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ @@ -419,12 +420,12 @@ namespace { } BSONObj o = r.nextSafe(); - Timestamp ts = o["ts"].timestamp(); + OpTime opTime = extractOpTime(o); long long hash = o["h"].numberLong(); - if( ts != _lastOpTimeFetched || hash != _lastFetchedHash ) { - log() << "our last op time fetched: " << _lastOpTimeFetched.toStringPretty(); - log() << "source's GTE: " << ts.toStringPretty(); - syncRollback(txn, _replCoord->getMyLastOptime().getTimestamp(), &r, _replCoord); + if ( opTime != _lastOpTimeFetched || hash != _lastFetchedHash ) { + log() << "our last op time fetched: " << _lastOpTimeFetched; + log() << "source's GTE: " << opTime; + syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord); return true; } @@ -446,7 +447,7 @@ namespace { _pause = true; _syncSourceHost = HostAndPort(); - _lastOpTimeFetched = Timestamp(0,0); + _lastOpTimeFetched = OpTime(); _lastFetchedHash = 0; _appliedBufferCondition.notify_all(); _pausedCondition.notify_all(); @@ -461,7 +462,7 @@ namespace { // reset _last fields with current oplog data _lastAppliedHash = updatedLastAppliedHash; - _lastOpTimeFetched = _replCoord->getMyLastOptime().getTimestamp(); + _lastOpTimeFetched = _replCoord->getMyLastOptime(); _lastFetchedHash = _lastAppliedHash; LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 41654d55469..bc03f755f1f 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -32,6 +32,7 @@ #include "mongo/util/queue.h" #include "mongo/db/repl/oplogreader.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/jsobj.h" namespace mongo { @@ -140,8 +141,7 @@ namespace repl { // _mutex protects all of the class variables except _syncSourceReader and _buffer mutable boost::mutex _mutex; - // TODO(siyuan) Change to OpTime after adding term to oplogs. - Timestamp _lastOpTimeFetched; + OpTime _lastOpTimeFetched; // lastAppliedHash is used to generate a new hash for the following op, when primary. long long _lastAppliedHash; diff --git a/src/mongo/db/repl/initial_sync.cpp b/src/mongo/db/repl/initial_sync.cpp index 2af2810db29..494094862ba 100644 --- a/src/mongo/db/repl/initial_sync.cpp +++ b/src/mongo/db/repl/initial_sync.cpp @@ -48,7 +48,7 @@ namespace repl { /* initial oplog application, during initial sync, after cloning. */ - void InitialSync::oplogApplication(OperationContext* txn, const Timestamp& endOpTime) { + void InitialSync::oplogApplication(OperationContext* txn, const OpTime& endOpTime) { if (replSetForceInitialSyncFailure > 0) { log() << "test code invoked, forced InitialSync failure: " << replSetForceInitialSyncFailure; diff --git a/src/mongo/db/repl/initial_sync.h b/src/mongo/db/repl/initial_sync.h index 49bcd182f95..41c1310656c 100644 --- a/src/mongo/db/repl/initial_sync.h +++ b/src/mongo/db/repl/initial_sync.h @@ -46,7 +46,7 @@ namespace repl { /** * applies up to endOpTime, fetching missing documents as needed. */ - void oplogApplication(OperationContext* txn, const Timestamp& endOpTime); + void oplogApplication(OperationContext* txn, const OpTime& endOpTime); // Initial sync will ignore all journal requirement flags and doesn't wait until // operations are durable before updating the last OpTime. diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp index 4ccfbb7eb7e..5cd3acd64e7 100644 --- a/src/mongo/db/repl/minvalid.cpp +++ b/src/mongo/db/repl/minvalid.cpp @@ -32,13 +32,13 @@ #include "mongo/db/repl/minvalid.h" -#include "mongo/bson/timestamp.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/operation_context_impl.h" +#include "mongo/db/repl/oplog.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -70,12 +70,15 @@ namespace { } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS); } - // TODO(siyuan) Change minValid to OpTime - void setMinValid(OperationContext* ctx, Timestamp ts) { + void setMinValid(OperationContext* ctx, const OpTime& opTime) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(ctx, MODE_IX); Lock::DBLock dblk(ctx->lockState(), "local", MODE_X); - Helpers::putSingleton(ctx, minvalidNS, BSON("$set" << BSON("ts" << ts))); + Helpers::putSingleton(ctx, + minvalidNS, + BSON("$set" << BSON("ts" << opTime.getTimestamp() << + "t" << opTime.getTerm()))); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS); } @@ -98,7 +101,7 @@ namespace { MONGO_UNREACHABLE; } - Timestamp getMinValid(OperationContext* txn) { + OpTime getMinValid(OperationContext* txn) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IS); Lock::DBLock dblk(txn->lockState(), "local", MODE_IS); @@ -106,9 +109,9 @@ namespace { BSONObj mv; bool found = Helpers::getSingleton(txn, minvalidNS, mv); if (found) { - return mv["ts"].timestamp(); + return extractOpTime(mv); } - return Timestamp(); + return OpTime(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS); } diff --git a/src/mongo/db/repl/minvalid.h b/src/mongo/db/repl/minvalid.h index 853c96b6c27..2118809c424 100644 --- a/src/mongo/db/repl/minvalid.h +++ b/src/mongo/db/repl/minvalid.h @@ -31,9 +31,9 @@ namespace mongo { class BSONObj; class OperationContext; - class Timestamp; namespace repl { + class OpTime; /** * Helper functions for maintaining local.replset.minvalid collection contents. @@ -61,7 +61,7 @@ namespace repl { * consider the dataset consistent. Do not allow client reads if our last applied operation is * before the minValid time. */ - void setMinValid(OperationContext* ctx, Timestamp ts); - Timestamp getMinValid(OperationContext* txn); + void setMinValid(OperationContext* ctx, const OpTime& opTime); + OpTime getMinValid(OperationContext* txn); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index c32599695ed..eed93e1561a 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -104,7 +104,7 @@ namespace { // Synchronizes the section where a new Timestamp is generated and when it actually // appears in the oplog. mongo::mutex newOpMutex; - boost::condition newOptimeNotifier; + boost::condition newTimestampNotifier; static std::string _oplogCollectionName; @@ -125,20 +125,25 @@ namespace { * function registers the new optime with the storage system and the replication coordinator, * and provides no facility to revert those registrations on rollback. */ - std::pair<Timestamp, long long> getNextOpTime(OperationContext* txn, + std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, Collection* oplog, const char* ns, ReplicationCoordinator* replCoord, const char* opstr) { boost::lock_guard<boost::mutex> lk(newOpMutex); Timestamp ts = getNextGlobalTimestamp(); - newOptimeNotifier.notify_all(); + newTimestampNotifier.notify_all(); fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); - long long hashNew; + long long hashNew = 0; + long long term = 0; + // Set hash and term if we're in replset mode, otherwise they remain 0 in master/slave. if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) { + // Current term. If we're not a replset of pv=1, it could be the default value (0) or + // the last valid term before downgrade. + term = ReplClientInfo::forClient(txn->getClient()).getTerm(); hashNew = BackgroundSync::get()->getLastAppliedHash(); @@ -146,7 +151,6 @@ namespace { if (*opstr == 'n') { // 'n' operations are always logged invariant(*ns == '\0'); - // 'n' operations do not advance the hash, since they are not rolled back } else { @@ -156,13 +160,10 @@ namespace { BackgroundSync::get()->setLastAppliedHash(hashNew); } } - else { - hashNew = 0; - } - // TODO(siyuan) Use current term - replCoord->setMyLastOptime(OpTime(ts, 0)); - return std::pair<Timestamp,long long>(ts, hashNew); + OpTime opTime(ts, term); + replCoord->setMyLastOptime(opTime); + return std::pair<OpTime,long long>(opTime, hashNew); } /** @@ -284,7 +285,7 @@ namespace { _localOplogCollection); } - std::pair<Timestamp, long long> slot = getNextOpTime(txn, + std::pair<OpTime, long long> slot = getNextOpTime(txn, _localOplogCollection, ns, replCoord, @@ -295,7 +296,8 @@ namespace { */ BSONObjBuilder b(256); - b.append("ts", slot.first); + b.append("ts", slot.first.getTimestamp()); + b.append("t", slot.first.getTerm()); b.append("h", slot.second); b.append("v", OPLOG_VERSION); b.append("op", opstr); @@ -312,8 +314,7 @@ namespace { OplogDocWriter writer( partial, obj ); checkOplogInsert( _localOplogCollection->insertDocument( txn, &writer, false ) ); - // TODO(siyuan) set term when logging ops. - ReplClientInfo::forClient(txn->getClient()).setLastOp( OpTime(slot.first, 0) ); + ReplClientInfo::forClient(txn->getClient()).setLastOp( slot.first ); } OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { @@ -344,9 +345,7 @@ namespace { it != ops.end(); ++it) { const BSONObj& op = *it; - const Timestamp ts = op["ts"].timestamp(); - // TODO(siyuan) Parse "term" and fill this out - const OpTime optime = OpTime(ts, 0); + const OpTime optime = extractOpTime(op); checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false)); @@ -396,7 +395,7 @@ namespace { } if ( !rs ) - initOpTimeFromOplog(txn, _oplogCollectionName); + initTimestampFromOplog(txn, _oplogCollectionName); return; } @@ -870,19 +869,24 @@ namespace { boost::unique_lock<boost::mutex> lk(newOpMutex); while (referenceTime == getLastSetTimestamp()) { - if (boost::cv_status::timeout == newOptimeNotifier.wait_for(lk, timeout)) + if (boost::cv_status::timeout == newTimestampNotifier.wait_for(lk, timeout)) return; } } - // TODO(siyuan) Change to OpTime after adding term to oplog. - void setNewOptime(const Timestamp& newTime) { + void setNewTimestamp(const Timestamp& newTime) { boost::lock_guard<boost::mutex> lk(newOpMutex); setGlobalTimestamp(newTime); - newOptimeNotifier.notify_all(); + newTimestampNotifier.notify_all(); + } + + OpTime extractOpTime(const BSONObj& op) { + const Timestamp ts = op["ts"].timestamp(); + const long long term = op["t"].numberLong(); // Default to 0 if it's absent + return OpTime(ts, term); } - void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS) { + void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { DBDirectClient c(txn); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), @@ -890,8 +894,8 @@ namespace { QueryOption_SlaveOk); if (!lastOp.isEmpty()) { - LOG(1) << "replSet setting last OpTime"; - setNewOptime(lastOp[ "ts" ].timestamp()); + LOG(1) << "replSet setting last Timestamp"; + setNewTimestamp(lastOp[ "ts" ].timestamp()); } } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index a7565a6cdf2..4d25a94109d 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -116,14 +116,19 @@ namespace repl { void waitForTimestampChange(const Timestamp& referenceTime, Microseconds timeout); /** - * Initializes the global OpTime with the value from the timestamp of the last oplog entry. + * Initializes the global Timestamp with the value from the timestamp of the last oplog entry. */ - void initOpTimeFromOplog(OperationContext* txn, const std::string& oplogNS); + void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS); /** - * Sets the global OpTime to be 'newTime'. + * Sets the global Timestamp to be 'newTime'. */ - void setNewOptime(const Timestamp& newTime); + void setNewTimestamp(const Timestamp& newTime); + + /* + * Extract the OpTime from log entry. + */ + OpTime extractOpTime(const BSONObj& op); /** * Detects the current replication mode and sets the "_oplogCollectionName" accordingly. diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index f8e72373253..4809a96546b 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -142,10 +142,11 @@ namespace repl { } void OplogReader::connectToSyncSource(OperationContext* txn, - Timestamp lastOpTimeFetched, + const OpTime& lastOpTimeFetched, ReplicationCoordinator* replCoord) { - const Timestamp sentinel(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); - Timestamp oldestOpTimeSeen = sentinel; + const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); + const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max()); + OpTime oldestOpTimeSeen = sentinel; invariant(conn() == NULL); @@ -164,9 +165,9 @@ namespace repl { // Connected to at least one member, but in all cases we were too stale to use them // as a sync source. - error() << "RS102 too stale to catch up"; - log() << "our last optime : " << lastOpTimeFetched.toStringLong(); - log() << "oldest available is " << oldestOpTimeSeen.toStringLong(); + error() << "too stale to catch up"; + log() << "our last optime : " << lastOpTimeFetched; + log() << "oldest available is " << oldestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; setMinValid(txn, oldestOpTimeSeen); bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING); @@ -188,15 +189,7 @@ namespace repl { // Read the first (oldest) op and confirm that it's not newer than our last // fetched op. Otherwise, we have fallen off the back of that source's oplog. BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query())); - BSONElement tsElem(remoteOldestOp["ts"]); - if (tsElem.type() != bsonTimestamp) { - // This member's got a bad op in its oplog. - warning() << "oplog invalid format on node " << candidate.toString(); - resetConnection(); - replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10)); - continue; - } - Timestamp remoteOldOpTime = tsElem.timestamp(); + OpTime remoteOldOpTime = extractOpTime(remoteOldestOp); if (lastOpTimeFetched < remoteOldOpTime) { // We're too stale to use this sync source. diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 017e2613281..66ac8f9c6c0 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -44,6 +44,7 @@ namespace mongo { namespace repl { class ReplicationCoordinator; + class OpTime; // {"$natural": -1 } extern const BSONObj reverseNaturalObj; @@ -145,8 +146,8 @@ namespace repl { * sync source blacklist. * This function may throw DB exceptions. */ - void connectToSyncSource(OperationContext* txn, - Timestamp lastOpTimeFetched, + void connectToSyncSource(OperationContext* txn, + const OpTime& lastOpTimeFetched, ReplicationCoordinator* replCoord); }; diff --git a/src/mongo/db/repl/repl_client_info.cpp b/src/mongo/db/repl/repl_client_info.cpp index 2f8a1e72470..631c121c223 100644 --- a/src/mongo/db/repl/repl_client_info.cpp +++ b/src/mongo/db/repl/repl_client_info.cpp @@ -33,6 +33,7 @@ #include "mongo/base/init.h" #include "mongo/db/client.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/util/decorable.h" namespace mongo { @@ -41,5 +42,12 @@ namespace repl { const Client::Decoration<ReplClientInfo> ReplClientInfo::forClient = Client::declareDecoration<ReplClientInfo>(); + long long ReplClientInfo::getTerm() { + if (_cachedTerm == kUninitializedTerm) { + _cachedTerm = getGlobalReplicationCoordinator()->getTerm(); + } + return _cachedTerm; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_client_info.h b/src/mongo/db/repl/repl_client_info.h index 640e52dd7e6..69694f2795d 100644 --- a/src/mongo/db/repl/repl_client_info.h +++ b/src/mongo/db/repl/repl_client_info.h @@ -50,9 +50,22 @@ namespace repl { void setRemoteID(OID rid) { _remoteId = rid; } OID getRemoteID() const { return _remoteId; } + // If we haven't cached a term from replication coordinator, get the current term + // and cache it during the life cycle of this client. + // + // Used by logOp() to attach the current term to each log entries. Assume we don't change + // the term since caching it. This is true for write commands, since we acquire the + // global lock (IX) for write commands and stepping down also needs that lock (S). + // Stepping down will kill all user operations, so there is no write after stepping down + // in the case of yielding. + long long getTerm(); + private: + static const long long kUninitializedTerm = -1; + OpTime _lastOp = OpTime(); OID _remoteId = OID(); + long long _cachedTerm = kUninitializedTerm; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 3a1dbb1bbc3..903e13fc758 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -620,6 +620,11 @@ namespace repl { */ virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0; + /** + * Return the current term. + */ + virtual long long getTerm() = 0; + protected: ReplicationCoordinator(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c5ba2565810..0fbd1b9424d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -248,7 +248,7 @@ namespace { } void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { - setNewOptime(newTime); + setNewTimestamp(newTime); } StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( @@ -276,8 +276,7 @@ namespace { "\" in most recent " << rsOplogName << " entry to have type Timestamp, but found " << typeName(tsElement.type())); } - // TODO(siyuan) add term - return StatusWith<OpTime>(OpTime(tsElement.timestamp(), 0)); + return StatusWith<OpTime>(extractOpTime(oplogEntry)); } catch (const DBException& ex) { return StatusWith<OpTime>(ex.toStatus()); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 6069630e6d9..ec3d3391d2a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2711,5 +2711,29 @@ namespace { _topCoord->summarizeAsHtml(output); } + long long ReplicationCoordinatorImpl::getTerm() { + long long term = OpTime::kDefaultTerm; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_getTerm_helper, + this, + stdx::placeholders::_1, + &term)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return term; + } + fassert(28660, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return term; + } + + void ReplicationCoordinatorImpl::_getTerm_helper( + const ReplicationExecutor::CallbackData& cbData, + long long* term) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + *term = _topCoord->getTerm(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index c03d0e94b16..4c4539eabe8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -264,6 +264,11 @@ namespace repl { virtual void summarizeAsHtml(ReplSetHtmlSummary* s) override; + /** + * Get current term from topology coordinator + */ + long long getTerm() override; + // ================== Test support API =================== /** @@ -852,6 +857,12 @@ namespace repl { void _summarizeAsHtml_finish(const ReplicationExecutor::CallbackData& cbData, ReplSetHtmlSummary* output); + /** + * Callback that gets the current term from topology coordinator. + */ + void _getTerm_helper(const ReplicationExecutor::CallbackData& cbData, long long* term); + + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 1f00a0ee521..4ca92dca19d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -334,5 +334,7 @@ namespace repl { void ReplicationCoordinatorMock::summarizeAsHtml(ReplSetHtmlSummary* output) {} + long long ReplicationCoordinatorMock::getTerm() { return OpTime::kDefaultTerm; } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 7cf584b18f4..7ac09e27c2d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -203,6 +203,8 @@ namespace repl { virtual void summarizeAsHtml(ReplSetHtmlSummary* output); + virtual long long getTerm(); + private: const ReplSettings _settings; diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index d8ceabaa98a..04e82151370 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -80,7 +80,7 @@ namespace { ReplicationCoordinator* replCoord, BackgroundSync* bgsync) { // Clear minvalid - setMinValid(txn, Timestamp()); + setMinValid(txn, OpTime()); AutoGetDb autoDb(txn, "local", MODE_X); massert(28585, "no local database found", autoDb.getDb()); @@ -219,16 +219,17 @@ namespace { bool _initialSyncApplyOplog( OperationContext* ctx, repl::SyncTail& syncer, OplogReader* r) { - // TODO(siyuan) Change to OpTime after adding term to op logs. - const Timestamp startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime() - .getTimestamp(); + const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime(); BSONObj lastOp; // If the fail point is set, exit failing. if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) { log() << "adding fake oplog entry to buffer."; BackgroundSync::get()->pushTestOpToBuffer( - BSON("ts" << startOpTime << "v" << 1 << "op" << "n")); + BSON("ts" << startOpTime.getTimestamp() << + "t" << startOpTime.getTerm() << + "v" << 1 << + "op" << "n")); return false; } @@ -257,7 +258,7 @@ namespace { return false; } - Timestamp stopOpTime = lastOp["ts"].timestamp(); + OpTime stopOpTime = extractOpTime(lastOp); // If we already have what we need then return. if (stopOpTime == startOpTime) @@ -268,8 +269,7 @@ namespace { // apply till stopOpTime try { - LOG(2) << "Applying oplog entries from " << startOpTime.toStringPretty() - << " until " << stopOpTime.toStringPretty(); + LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime; syncer.oplogApplication(ctx, stopOpTime); if (inShutdown()) { @@ -356,11 +356,12 @@ namespace { OplogReader r; Timestamp now(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); + OpTime nowOpTime(now, std::numeric_limits<long long>::max()); while (r.getHost().empty()) { // We must prime the sync source selector so that it considers all candidates regardless - // of oplog position, by passing in "now" as the last op fetched time. - r.connectToSyncSource(&txn, now, replCoord); + // of oplog position, by passing in "now" with max term as the last op fetched time. + r.connectToSyncSource(&txn, nowOpTime, replCoord); if (r.getHost().empty()) { std::string msg = "no valid sync sources found in current replset to do an initial sync"; @@ -416,7 +417,7 @@ namespace { OpTime lastOptime = writeOpsToOplog(&txn, ops); ReplClientInfo::forClient(txn.getClient()).setLastOp(lastOptime); replCoord->setMyLastOptime(lastOptime); - setNewOptime(lastOptime.getTimestamp()); + setNewTimestamp(lastOptime.getTimestamp()); std::string msg = "oplog sync 1 of 3"; log() << msg; @@ -465,9 +466,7 @@ namespace { { ScopedTransaction scopedXact(&txn, MODE_IX); AutoGetDb autodb(&txn, "local", MODE_X); - // TODO(siyuan) Change to OpTime after adding term to op logs. - Timestamp lastOpTimeWritten( - getGlobalReplicationCoordinator()->getMyLastOptime().getTimestamp()); + OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime()); log() << "set minValid=" << lastOpTimeWritten; // Initial sync is now complete. Flag this by setting minValid to the last thing diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 0f6c308c96c..3ceda67ffec 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -467,8 +467,8 @@ namespace { // we have items we are writing that aren't from a point-in-time. thus best not to come // online until we get to that point in freshness. - Timestamp minValid = newMinValid["ts"].timestamp(); - log() << "minvalid=" << minValid.toStringLong(); + OpTime minValid = extractOpTime(newMinValid); + log() << "minvalid=" << minValid; setMinValid(txn, minValid); // any full collection resyncs required? @@ -572,8 +572,8 @@ namespace { err = "can't get minvalid from sync source"; } else { - Timestamp minValid = newMinValid["ts"].timestamp(); - log() << "minvalid=" << minValid.toStringLong(); + OpTime minValid = extractOpTime(newMinValid); + log() << "minvalid=" << minValid; setMinValid(txn, minValid); } } @@ -932,13 +932,13 @@ namespace { } // namespace void syncRollback(OperationContext* txn, - Timestamp lastOpTimeApplied, + const OpTime& lastOpTimeApplied, OplogReader* oplogreader, ReplicationCoordinator* replCoord) { // check that we are at minvalid, otherwise we cannot rollback as we may be in an // inconsistent state { - Timestamp minvalid = getMinValid(txn); + OpTime minvalid = getMinValid(txn); if( minvalid > lastOpTimeApplied ) { severe() << "need to rollback, but in inconsistent state" << endl; log() << "minvalid: " << minvalid.toString() << " our last optime: " diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index 55f1b887d92..bfe2c0c5621 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -33,7 +33,8 @@ namespace mongo { class Timestamp; namespace repl { - class OplogReader; + class OplogReader; + class OpTime; class ReplicationCoordinator; /** @@ -59,7 +60,7 @@ namespace repl { */ void syncRollback(OperationContext* txn, - Timestamp lastOpTimeWritten, + const OpTime& lastOpTimeWritten, OplogReader* oplogreader, ReplicationCoordinator* replCoord); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 0c87acbf516..0e690b80c30 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -341,13 +341,13 @@ namespace { // Doles out all the work to the writer pool threads and waits for them to complete // static - Timestamp SyncTail::multiApply(OperationContext* txn, - const OpQueue& ops, - threadpool::ThreadPool* prefetcherPool, - threadpool::ThreadPool* writerPool, - MultiSyncApplyFunc func, - SyncTail* sync, - bool supportsWaitingUntilDurable) { + OpTime SyncTail::multiApply(OperationContext* txn, + const OpQueue& ops, + threadpool::ThreadPool* prefetcherPool, + threadpool::ThreadPool* writerPool, + MultiSyncApplyFunc func, + SyncTail* sync, + bool supportsWaitingUntilDurable) { invariant(prefetcherPool); invariant(writerPool); invariant(func); @@ -381,7 +381,7 @@ namespace { applyOps(writerVectors, writerPool, func, sync); if (inShutdown()) { - return Timestamp(); + return OpTime(); } const bool mustWaitUntilDurable = replCoord->isV1ElectionProtocol() && @@ -397,19 +397,19 @@ namespace { } ReplClientInfo::forClient(txn->getClient()).setLastOp(lastOpTime); replCoord->setMyLastOptime(lastOpTime); - setNewOptime(lastOpTime.getTimestamp()); + setNewTimestamp(lastOpTime.getTimestamp()); BackgroundSync::get()->notify(txn); - return lastOpTime.getTimestamp(); + return lastOpTime; } - void SyncTail::oplogApplication(OperationContext* txn, const Timestamp& endOpTime) { + void SyncTail::oplogApplication(OperationContext* txn, const OpTime& endOpTime) { _applyOplogUntil(txn, endOpTime); } /* applies oplog from "now" until endOpTime using the applier threads for initial sync*/ - void SyncTail::_applyOplogUntil(OperationContext* txn, const Timestamp& endOpTime) { + void SyncTail::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTime) { unsigned long long bytesApplied = 0; unsigned long long entriesApplied = 0; while (true) { @@ -421,7 +421,7 @@ namespace { // Check if we reached the end const BSONObj currentOp = ops.back(); - const Timestamp currentOpTime = currentOp["ts"].timestamp(); + const OpTime currentOpTime = extractOpTime(currentOp); // When we reach the end return this batch if (currentOpTime == endOpTime) { @@ -451,14 +451,13 @@ namespace { bytesApplied += ops.getSize(); entriesApplied += ops.getDeque().size(); - const Timestamp lastOpTime = multiApply(txn, - ops, - &_prefetcherPool, - &_writerPool, - _applyFunc, - this, - supportsWaitingUntilDurable()); - + const OpTime lastOpTime = multiApply(txn, + ops, + &_prefetcherPool, + &_writerPool, + _applyFunc, + this, + supportsWaitingUntilDurable()); if (inShutdown()) { return; } @@ -467,7 +466,7 @@ namespace { if (lastOpTime == endOpTime) { LOG(1) << "SyncTail applied " << entriesApplied << " entries (" << bytesApplied << " bytes)" - << " and finished at opTime " << endOpTime.toStringPretty(); + << " and finished at opTime " << endOpTime; return; } } // end of while (true) @@ -493,8 +492,8 @@ namespace { return; } - Timestamp minvalid = getMinValid(txn); - if (minvalid > replCoord->getMyLastOptime().getTimestamp()) { + OpTime minvalid = getMinValid(txn); + if (minvalid > replCoord->getMyLastOptime()) { return; } @@ -578,8 +577,7 @@ namespace { // Set minValid to the last op to be applied in this next batch. // This will cause this node to go into RECOVERING state // if we should crash and restart before updating the oplog - Timestamp minValid = lastOp["ts"].timestamp(); - setMinValid(&txn, minValid); + setMinValid(&txn, extractOpTime(lastOp)); multiApply(&txn, ops, &_prefetcherPool, diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 20c59da5050..6b549c4f3d8 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -44,6 +44,7 @@ namespace mongo { namespace repl { class BackgroundSyncInterface; class ReplicationCoordinator; + class OpTime; /** * "Normal" replica set syncing @@ -97,7 +98,7 @@ namespace repl { /** * Runs _applyOplogUntil(stopOpTime) */ - virtual void oplogApplication(OperationContext* txn, const Timestamp& stopOpTime); + virtual void oplogApplication(OperationContext* txn, const OpTime& stopOpTime); void oplogApplication(); bool peek(BSONObj* obj); @@ -156,20 +157,20 @@ namespace repl { // Prefetch and write a deque of operations, using the supplied function. // Initial Sync and Sync Tail each use a different function. // Returns the last OpTime applied. - static Timestamp multiApply(OperationContext* txn, - const OpQueue& ops, - threadpool::ThreadPool* prefetcherPool, - threadpool::ThreadPool* writerPool, - MultiSyncApplyFunc func, - SyncTail* sync, - bool supportsAwaitingCommit); + static OpTime multiApply(OperationContext* txn, + const OpQueue& ops, + threadpool::ThreadPool* prefetcherPool, + threadpool::ThreadPool* writerPool, + MultiSyncApplyFunc func, + SyncTail* sync, + bool supportsAwaitingCommit); /** * Applies oplog entries until reaching "endOpTime". * * NOTE:Will not transition or check states */ - void _applyOplogUntil(OperationContext* txn, const Timestamp& endOpTime); + void _applyOplogUntil(OperationContext* txn, const OpTime& endOpTime); private: std::string _hostname; |