diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-08-12 13:24:35 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-08-12 17:19:31 -0400 |
commit | 2482d4387b6a0fcdf825e07d7f5260041a89f3f6 (patch) | |
tree | 165b54668d3d7589e71c702db330f40109e89227 | |
parent | 7e8fb8df8eb69c7817a8fd00b78ae2f617d9f10d (diff) | |
download | mongo-2482d4387b6a0fcdf825e07d7f5260041a89f3f6.tar.gz |
SERVER-19855 Parsing/serialization logic for OpTime
This change pulls the OpTime serialization/deserialization logic to be
part of the class.
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/minvalid.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/optime.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/optime.h | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 4 |
11 files changed, 62 insertions, 36 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index cd82cdf5059..19b25760397 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -93,7 +93,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat "we are ahead of the sync source, will try to roll back"); } BSONObj o = result.getValue(); - OpTime opTime = extractOpTime(o); + OpTime opTime = fassertStatusOK(28778, OpTime::parseFromBSON(o)); long long hash = o["h"].numberLong(); if (opTime != lastOpTimeFetched || hash != lastHashFetched) { return Status(ErrorCodes::OplogStartMissing, @@ -461,7 +461,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& { stdx::unique_lock<stdx::mutex> lock(_mutex); _lastFetchedHash = o["h"].numberLong(); - _lastOpTimeFetched = extractOpTime(o); + _lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromBSON(o)); LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched; } } diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp index b14966486fe..5584ea54658 100644 --- a/src/mongo/db/repl/minvalid.cpp +++ b/src/mongo/db/repl/minvalid.cpp @@ -111,7 +111,7 @@ OpTime getMinValid(OperationContext* txn) { BSONObj mv; bool found = Helpers::getSingleton(txn, minvalidNS, mv); if (found) { - return extractOpTime(mv); + return fassertStatusOK(28771, OpTime::parseFromBSON(mv)); } return OpTime(); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index aa412056939..33b19800f8d 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -121,9 +121,6 @@ stdx::condition_variable newTimestampNotifier; static std::string _oplogCollectionName; -const std::string kTimestampFieldName = "ts"; -const std::string kTermFieldName = "t"; - // so we can fail the same way void checkOplogInsert(StatusWith<RecordId> result) { massert(17322, @@ -318,11 +315,8 @@ void _logOp(OperationContext* txn, */ BSONObjBuilder b(256); - b.append(kTimestampFieldName, slot.first.getTimestamp()); - // Don't add term in protocol version 0. - if (slot.first.getTerm() != OpTime::kProtocolVersionV0Term) { - b.append(kTermFieldName, slot.first.getTerm()); - } + + slot.first.append(&b); b.append("h", slot.second); b.append("v", OPLOG_VERSION); b.append("op", opstr); @@ -391,7 +385,7 @@ OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { const BSONObj& op = *it; - const OpTime optime = extractOpTime(op); + const OpTime optime = fassertStatusOK(28779, OpTime::parseFromBSON(op)); checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false)); @@ -908,23 +902,14 @@ void setNewTimestamp(const Timestamp& newTime) { newTimestampNotifier.notify_all(); } -OpTime extractOpTime(const BSONObj& op) { - const Timestamp ts = op[kTimestampFieldName].timestamp(); - long long term; - // Default to -1 if the term is absent. - fassert(28696, - bsonExtractIntegerFieldWithDefault( - op, kTermFieldName, OpTime::kProtocolVersionV0Term, &term)); - return OpTime(ts, term); -} - void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { DBDirectClient c(txn); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { LOG(1) << "replSet setting last Timestamp"; - setNewTimestamp(lastOp[kTimestampFieldName].timestamp()); + const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromBSON(lastOp)); + setNewTimestamp(opTime.getTimestamp()); } } @@ -1021,7 +1006,7 @@ void SnapshotThread::run() { continue; // oplog is completely empty. const auto op = record->data.releaseToBson(); - opTimeOfSnapshot = extractOpTime(op); + opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromBSON(op)); invariant(!opTimeOfSnapshot.isNull()); } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 5696d5ac713..f17d7916f99 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -141,11 +141,6 @@ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS); */ void setNewTimestamp(const Timestamp& newTime); -/* - * Extract the OpTime from log entry. - */ -OpTime extractOpTime(const BSONObj& op); - /** * Detects the current replication mode and sets the "_oplogCollectionName" accordingly. */ diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 06f2da78101..0a94ba73e87 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -183,7 +183,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn, // Read the first (oldest) op and confirm that it's not newer than our last // fetched op. Otherwise, we have fallen off the back of that source's oplog. BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query())); - OpTime remoteOldOpTime = extractOpTime(remoteOldestOp); + OpTime remoteOldOpTime = fassertStatusOK(28776, OpTime::parseFromBSON(remoteOldestOp)); // remoteOldOpTime may come from a very old config, so we cannot compare their terms. if (!lastOpTimeFetched.isNull() && diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp index 87cf966c1ef..31d83d91e4f 100644 --- a/src/mongo/db/repl/optime.cpp +++ b/src/mongo/db/repl/optime.cpp @@ -31,10 +31,18 @@ #include <string> #include <utility> +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/db/repl/optime.h" namespace mongo { namespace repl { +namespace { + +const char* kTimestampFieldName = "ts"; +const char* kTermFieldName = "t"; + +} // namespace OpTime::OpTime(Timestamp ts, long long term) : _timestamp(std::move(ts)), _term(term) {} @@ -54,6 +62,30 @@ bool OpTime::isNull() const { return _timestamp.isNull(); } +void OpTime::append(BSONObjBuilder* builder) const { + builder->append(kTimestampFieldName, _timestamp); + + // Don't add term in protocol version 0. + if (_term != kProtocolVersionV0Term) { + builder->append(kTermFieldName, _term); + } +} + +StatusWith<OpTime> OpTime::parseFromBSON(const BSONObj& obj) { + Timestamp ts; + Status status = bsonExtractTimestampField(obj, kTimestampFieldName, &ts); + if (!status.isOK()) + return status; + + // Default to -1 if the term is absent. + long long term; + status = bsonExtractIntegerFieldWithDefault(obj, kTermFieldName, kProtocolVersionV0Term, &term); + if (!status.isOK()) + return status; + + return OpTime(ts, term); +} + std::string OpTime::toString() const { std::stringstream ss; ss << "(term: " << _term << ", timestamp: " << _timestamp.toStringPretty() << ")"; diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index 00ec1c29f97..f41b4386c65 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -33,6 +33,12 @@ #include "mongo/bson/timestamp.h" namespace mongo { + +class BSONObj; +class BSONObjBuilder; +template <typename T> +class StatusWith; + namespace repl { /** @@ -62,6 +68,14 @@ public: long long getTerm() const; + /** + * Serializes the contents of this optime to the specified builder in the form: + * { ts: <timestamp>, t: term } + */ + void append(BSONObjBuilder* builder) const; + + static StatusWith<OpTime> parseFromBSON(const BSONObj& obj); + std::string toString() const; // Returns true when this OpTime is not yet initialized. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 9203cf76f9e..18ce6ba016d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -308,7 +308,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(Opera << " entry to have type Timestamp, but found " << typeName(tsElement.type())); } - return StatusWith<OpTime>(extractOpTime(oplogEntry)); + return OpTime::parseFromBSON(oplogEntry); } catch (const DBException& ex) { return StatusWith<OpTime>(ex.toStatus()); } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 8759a15e16e..d62b86f20e9 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -253,7 +253,7 @@ bool _initialSyncApplyOplog(OperationContext* ctx, repl::SyncTail& syncer, Oplog return false; } - OpTime stopOpTime = extractOpTime(lastOp); + OpTime stopOpTime = fassertStatusOK(28777, OpTime::parseFromBSON(lastOp)); // If we already have what we need then return. if (stopOpTime == startOpTime) diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index e776e0f9d41..8fadaeba90b 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -323,7 +323,7 @@ void syncFixUp(OperationContext* txn, // we have items we are writing that aren't from a point-in-time. thus best not to come // online until we get to that point in freshness. - OpTime minValid = extractOpTime(newMinValid); + OpTime minValid = fassertStatusOK(28774, OpTime::parseFromBSON(newMinValid)); log() << "minvalid=" << minValid; setMinValid(txn, minValid); @@ -426,7 +426,7 @@ void syncFixUp(OperationContext* txn, if (newMinValid.isEmpty()) { err = "can't get minvalid from sync source"; } else { - OpTime minValid = extractOpTime(newMinValid); + OpTime minValid = fassertStatusOK(28775, OpTime::parseFromBSON(newMinValid)); log() << "minvalid=" << minValid; setMinValid(txn, minValid); } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 65edf1cf5c6..7ce82364723 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -431,7 +431,7 @@ void SyncTail::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTime) // Check if we reached the end const BSONObj currentOp = ops.back(); - const OpTime currentOpTime = extractOpTime(currentOp); + const OpTime currentOpTime = fassertStatusOK(28772, OpTime::parseFromBSON(currentOp)); // When we reach the end return this batch if (currentOpTime == endOpTime) { @@ -591,7 +591,7 @@ void SyncTail::oplogApplication() { // Set minValid to the last op to be applied in this next batch. // This will cause this node to go into RECOVERING state // if we should crash and restart before updating the oplog - setMinValid(&txn, extractOpTime(lastOp)); + setMinValid(&txn, fassertStatusOK(28773, OpTime::parseFromBSON(lastOp))); multiApply(&txn, ops, &_prefetcherPool, |