summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-08-12 13:24:35 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-08-12 17:19:31 -0400
commit2482d4387b6a0fcdf825e07d7f5260041a89f3f6 (patch)
tree165b54668d3d7589e71c702db330f40109e89227
parent7e8fb8df8eb69c7817a8fd00b78ae2f617d9f10d (diff)
downloadmongo-2482d4387b6a0fcdf825e07d7f5260041a89f3f6.tar.gz
SERVER-19855 Parsing/serialization logic for OpTime
This change pulls the OpTime serialization/deserialization logic to be part of the class.
-rw-r--r--src/mongo/db/repl/bgsync.cpp4
-rw-r--r--src/mongo/db/repl/minvalid.cpp2
-rw-r--r--src/mongo/db/repl/oplog.cpp27
-rw-r--r--src/mongo/db/repl/oplog.h5
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/optime.cpp32
-rw-r--r--src/mongo/db/repl/optime.h14
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp2
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp4
-rw-r--r--src/mongo/db/repl/sync_tail.cpp4
11 files changed, 62 insertions, 36 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index cd82cdf5059..19b25760397 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -93,7 +93,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat
"we are ahead of the sync source, will try to roll back");
}
BSONObj o = result.getValue();
- OpTime opTime = extractOpTime(o);
+ OpTime opTime = fassertStatusOK(28778, OpTime::parseFromBSON(o));
long long hash = o["h"].numberLong();
if (opTime != lastOpTimeFetched || hash != lastHashFetched) {
return Status(ErrorCodes::OplogStartMissing,
@@ -461,7 +461,7 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
{
stdx::unique_lock<stdx::mutex> lock(_mutex);
_lastFetchedHash = o["h"].numberLong();
- _lastOpTimeFetched = extractOpTime(o);
+ _lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromBSON(o));
LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched;
}
}
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp
index b14966486fe..5584ea54658 100644
--- a/src/mongo/db/repl/minvalid.cpp
+++ b/src/mongo/db/repl/minvalid.cpp
@@ -111,7 +111,7 @@ OpTime getMinValid(OperationContext* txn) {
BSONObj mv;
bool found = Helpers::getSingleton(txn, minvalidNS, mv);
if (found) {
- return extractOpTime(mv);
+ return fassertStatusOK(28771, OpTime::parseFromBSON(mv));
}
return OpTime();
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index aa412056939..33b19800f8d 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -121,9 +121,6 @@ stdx::condition_variable newTimestampNotifier;
static std::string _oplogCollectionName;
-const std::string kTimestampFieldName = "ts";
-const std::string kTermFieldName = "t";
-
// so we can fail the same way
void checkOplogInsert(StatusWith<RecordId> result) {
massert(17322,
@@ -318,11 +315,8 @@ void _logOp(OperationContext* txn,
*/
BSONObjBuilder b(256);
- b.append(kTimestampFieldName, slot.first.getTimestamp());
- // Don't add term in protocol version 0.
- if (slot.first.getTerm() != OpTime::kProtocolVersionV0Term) {
- b.append(kTermFieldName, slot.first.getTerm());
- }
+
+ slot.first.append(&b);
b.append("h", slot.second);
b.append("v", OPLOG_VERSION);
b.append("op", opstr);
@@ -391,7 +385,7 @@ OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
const BSONObj& op = *it;
- const OpTime optime = extractOpTime(op);
+ const OpTime optime = fassertStatusOK(28779, OpTime::parseFromBSON(op));
checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false));
@@ -908,23 +902,14 @@ void setNewTimestamp(const Timestamp& newTime) {
newTimestampNotifier.notify_all();
}
-OpTime extractOpTime(const BSONObj& op) {
- const Timestamp ts = op[kTimestampFieldName].timestamp();
- long long term;
- // Default to -1 if the term is absent.
- fassert(28696,
- bsonExtractIntegerFieldWithDefault(
- op, kTermFieldName, OpTime::kProtocolVersionV0Term, &term));
- return OpTime(ts, term);
-}
-
void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) {
DBDirectClient c(txn);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
LOG(1) << "replSet setting last Timestamp";
- setNewTimestamp(lastOp[kTimestampFieldName].timestamp());
+ const OpTime opTime = fassertStatusOK(28696, OpTime::parseFromBSON(lastOp));
+ setNewTimestamp(opTime.getTimestamp());
}
}
@@ -1021,7 +1006,7 @@ void SnapshotThread::run() {
continue; // oplog is completely empty.
const auto op = record->data.releaseToBson();
- opTimeOfSnapshot = extractOpTime(op);
+ opTimeOfSnapshot = fassertStatusOK(28780, OpTime::parseFromBSON(op));
invariant(!opTimeOfSnapshot.isNull());
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 5696d5ac713..f17d7916f99 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -141,11 +141,6 @@ void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS);
*/
void setNewTimestamp(const Timestamp& newTime);
-/*
- * Extract the OpTime from log entry.
- */
-OpTime extractOpTime(const BSONObj& op);
-
/**
* Detects the current replication mode and sets the "_oplogCollectionName" accordingly.
*/
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 06f2da78101..0a94ba73e87 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -183,7 +183,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
// Read the first (oldest) op and confirm that it's not newer than our last
// fetched op. Otherwise, we have fallen off the back of that source's oplog.
BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query()));
- OpTime remoteOldOpTime = extractOpTime(remoteOldestOp);
+ OpTime remoteOldOpTime = fassertStatusOK(28776, OpTime::parseFromBSON(remoteOldestOp));
// remoteOldOpTime may come from a very old config, so we cannot compare their terms.
if (!lastOpTimeFetched.isNull() &&
diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp
index 87cf966c1ef..31d83d91e4f 100644
--- a/src/mongo/db/repl/optime.cpp
+++ b/src/mongo/db/repl/optime.cpp
@@ -31,10 +31,18 @@
#include <string>
#include <utility>
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/repl/optime.h"
namespace mongo {
namespace repl {
+namespace {
+
+const char* kTimestampFieldName = "ts";
+const char* kTermFieldName = "t";
+
+} // namespace
OpTime::OpTime(Timestamp ts, long long term) : _timestamp(std::move(ts)), _term(term) {}
@@ -54,6 +62,30 @@ bool OpTime::isNull() const {
return _timestamp.isNull();
}
+void OpTime::append(BSONObjBuilder* builder) const {
+ builder->append(kTimestampFieldName, _timestamp);
+
+ // Don't add term in protocol version 0.
+ if (_term != kProtocolVersionV0Term) {
+ builder->append(kTermFieldName, _term);
+ }
+}
+
+StatusWith<OpTime> OpTime::parseFromBSON(const BSONObj& obj) {
+ Timestamp ts;
+ Status status = bsonExtractTimestampField(obj, kTimestampFieldName, &ts);
+ if (!status.isOK())
+ return status;
+
+ // Default to -1 if the term is absent.
+ long long term;
+ status = bsonExtractIntegerFieldWithDefault(obj, kTermFieldName, kProtocolVersionV0Term, &term);
+ if (!status.isOK())
+ return status;
+
+ return OpTime(ts, term);
+}
+
std::string OpTime::toString() const {
std::stringstream ss;
ss << "(term: " << _term << ", timestamp: " << _timestamp.toStringPretty() << ")";
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h
index 00ec1c29f97..f41b4386c65 100644
--- a/src/mongo/db/repl/optime.h
+++ b/src/mongo/db/repl/optime.h
@@ -33,6 +33,12 @@
#include "mongo/bson/timestamp.h"
namespace mongo {
+
+class BSONObj;
+class BSONObjBuilder;
+template <typename T>
+class StatusWith;
+
namespace repl {
/**
@@ -62,6 +68,14 @@ public:
long long getTerm() const;
+ /**
+ * Serializes the contents of this optime to the specified builder in the form:
+ * { ts: <timestamp>, t: term }
+ */
+ void append(BSONObjBuilder* builder) const;
+
+ static StatusWith<OpTime> parseFromBSON(const BSONObj& obj);
+
std::string toString() const;
// Returns true when this OpTime is not yet initialized.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 9203cf76f9e..18ce6ba016d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -308,7 +308,7 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(Opera
<< " entry to have type Timestamp, but found "
<< typeName(tsElement.type()));
}
- return StatusWith<OpTime>(extractOpTime(oplogEntry));
+ return OpTime::parseFromBSON(oplogEntry);
} catch (const DBException& ex) {
return StatusWith<OpTime>(ex.toStatus());
}
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 8759a15e16e..d62b86f20e9 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -253,7 +253,7 @@ bool _initialSyncApplyOplog(OperationContext* ctx, repl::SyncTail& syncer, Oplog
return false;
}
- OpTime stopOpTime = extractOpTime(lastOp);
+ OpTime stopOpTime = fassertStatusOK(28777, OpTime::parseFromBSON(lastOp));
// If we already have what we need then return.
if (stopOpTime == startOpTime)
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index e776e0f9d41..8fadaeba90b 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -323,7 +323,7 @@ void syncFixUp(OperationContext* txn,
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
- OpTime minValid = extractOpTime(newMinValid);
+ OpTime minValid = fassertStatusOK(28774, OpTime::parseFromBSON(newMinValid));
log() << "minvalid=" << minValid;
setMinValid(txn, minValid);
@@ -426,7 +426,7 @@ void syncFixUp(OperationContext* txn,
if (newMinValid.isEmpty()) {
err = "can't get minvalid from sync source";
} else {
- OpTime minValid = extractOpTime(newMinValid);
+ OpTime minValid = fassertStatusOK(28775, OpTime::parseFromBSON(newMinValid));
log() << "minvalid=" << minValid;
setMinValid(txn, minValid);
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 65edf1cf5c6..7ce82364723 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -431,7 +431,7 @@ void SyncTail::_applyOplogUntil(OperationContext* txn, const OpTime& endOpTime)
// Check if we reached the end
const BSONObj currentOp = ops.back();
- const OpTime currentOpTime = extractOpTime(currentOp);
+ const OpTime currentOpTime = fassertStatusOK(28772, OpTime::parseFromBSON(currentOp));
// When we reach the end return this batch
if (currentOpTime == endOpTime) {
@@ -591,7 +591,7 @@ void SyncTail::oplogApplication() {
// Set minValid to the last op to be applied in this next batch.
// This will cause this node to go into RECOVERING state
// if we should crash and restart before updating the oplog
- setMinValid(&txn, extractOpTime(lastOp));
+ setMinValid(&txn, fassertStatusOK(28773, OpTime::parseFromBSON(lastOp)));
multiApply(&txn,
ops,
&_prefetcherPool,