summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-05-03 16:23:39 -0400
committerJudah Schvimer <judah@mongodb.com>2017-05-03 16:23:39 -0400
commitfa2dcc33303dd6a2080c108e121da6984d08a9d6 (patch)
tree27856c21f7ff7c4968891bc13257362dc1df4d73 /src/mongo/db
parent1530cf54fd9db4e9e46e5fdd0b42972cd84b4c25 (diff)
downloadmongo-fa2dcc33303dd6a2080c108e121da6984d08a9d6.tar.gz
SERVER-28846 replace public variables with getters in OplogEntry
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp35
-rw-r--r--src/mongo/db/repl/oplog_entry.h34
-rw-r--r--src/mongo/db/repl/sync_tail.cpp40
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp50
4 files changed, 88 insertions, 71 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 46cba75c41c..7ec2db33ab1 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -44,54 +44,57 @@ OplogEntry::OplogEntry(BSONObj rawInput) : raw(std::move(rawInput)) {
raw = raw.copy();
}
+ BSONElement version;
for (auto elem : raw) {
const auto name = elem.fieldNameStringData();
if (name == "ns") {
- ns = elem.valuestrsafe();
+ _ns = NamespaceString(elem.valuestrsafe());
} else if (name == "op") {
- opType = elem.valuestrsafe();
+ _opType = elem.valuestrsafe();
} else if (name == "o2") {
- o2 = elem;
+ _o2 = elem.Obj();
} else if (name == "ts") {
- ts = elem;
+ _ts = elem.timestamp();
} else if (name == "v") {
version = elem;
} else if (name == "o") {
- o = elem;
+ _o = elem.Obj();
}
}
+
+ _version = version.eoo() ? 1 : version.numberInt();
}
bool OplogEntry::isCommand() const {
- return opType[0] == 'c';
+ return getOpType()[0] == 'c';
}
bool OplogEntry::isCrudOpType() const {
- switch (opType[0]) {
+ switch (getOpType()[0]) {
case 'd':
case 'i':
case 'u':
- return opType[1] == 0;
+ return getOpType()[1] == 0;
}
return false;
}
bool OplogEntry::hasNamespace() const {
- return !ns.empty();
+ return !getNamespace().isEmpty();
}
int OplogEntry::getVersion() const {
- return version.eoo() ? 1 : version.Int();
+ return _version;
}
BSONElement OplogEntry::getIdElement() const {
invariant(isCrudOpType());
- switch (opType[0]) {
+ switch (getOpType()[0]) {
case 'u':
- return o2.Obj()["_id"];
+ return getObject2()["_id"];
case 'd':
case 'i':
- return o.Obj()["_id"];
+ return getObject()["_id"];
}
MONGO_UNREACHABLE;
}
@@ -100,12 +103,8 @@ OpTime OplogEntry::getOpTime() const {
return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw));
}
-Seconds OplogEntry::getTimestampSecs() const {
- return Seconds(ts.timestamp().getSecs());
-}
-
StringData OplogEntry::getCollectionName() const {
- return nsToCollectionSubstring(ns);
+ return getNamespace().coll();
}
std::string OplogEntry::toString() const {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index f7c59d32c69..4e7b3d67839 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -30,6 +30,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
namespace mongo {
@@ -65,13 +66,34 @@ struct OplogEntry {
BSONObj raw; // Owned.
- StringData ns = "";
- StringData opType = "";
+ // TODO: Remove these when we add the IDL.
+ const NamespaceString& getNamespace() const {
+ return _ns;
+ }
- BSONElement version;
- BSONElement o;
- BSONElement o2;
- BSONElement ts;
+ StringData getOpType() const {
+ return _opType;
+ }
+
+ Timestamp getTimestamp() const {
+ return _ts;
+ }
+
+ const BSONObj& getObject() const {
+ return _o;
+ }
+
+ const BSONObj& getObject2() const {
+ return _o2;
+ }
+
+private:
+ NamespaceString _ns;
+ StringData _opType = "";
+ int _version;
+ Timestamp _ts;
+ BSONObj _o;
+ BSONObj _o2;
};
std::ostream& operator<<(std::ostream& s, const OplogEntry& o);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index acfe9915aa6..54364c9c839 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -583,7 +583,7 @@ void fillWriterVectors(OperationContext* opCtx,
CachedCollectionProperties collPropertiesCache;
for (auto&& op : *ops) {
- StringMapTraits::HashedKey hashedNs(op.ns);
+ StringMapTraits::HashedKey hashedNs(op.getNamespace().ns());
uint32_t hash = hashedNs.hash();
if (op.isCrudOpType()) {
@@ -602,7 +602,7 @@ void fillWriterVectors(OperationContext* opCtx,
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
- if (op.opType == "i" && collProperties.isCapped) {
+ if (op.getOpType() == "i" && collProperties.isCapped) {
// Mark capped collection ops before storing them to ensure we do not attempt to
// bulk insert them.
op.isForCappedCollection = true;
@@ -871,14 +871,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
if (!entry.raw.isEmpty()) {
// check for oplog version change
- int curVersion = 0;
- if (entry.version.eoo()) {
- // missing version means version 1
- curVersion = 1;
- } else {
- curVersion = entry.version.Int();
- }
-
+ int curVersion = entry.getVersion();
if (curVersion != OplogEntry::kOplogVersion) {
severe() << "expected oplog version " << OplogEntry::kOplogVersion
<< " but found version " << curVersion
@@ -887,8 +880,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
}
}
- if (limits.slaveDelayLatestTimestamp &&
- entry.ts.timestampTime() > *limits.slaveDelayLatestTimestamp) {
+ auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
+ if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) {
ops->pop_back(); // Don't do this op yet.
if (ops->empty()) {
@@ -900,11 +893,11 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
}
// Check for ops that must be processed one at a time.
- if (entry.raw.isEmpty() || // sentinel that network queue is drained.
- (entry.opType[0] == 'c') || // commands.
+ if (entry.raw.isEmpty() || // sentinel that network queue is drained.
+ (entry.getOpType()[0] == 'c') || // commands.
// Index builds are achieved through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
- (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
+ (!entry.getNamespace().isEmpty() && entry.getNamespace().coll() == "system.indexes")) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
_networkQueue->consume(opCtx);
@@ -1070,7 +1063,9 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
if (oplogEntryPointers->size() > 1) {
std::stable_sort(oplogEntryPointers->begin(),
oplogEntryPointers->end(),
- [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; });
+ [](const OplogEntry* l, const OplogEntry* r) {
+ return l->getNamespace() < r->getNamespace();
+ });
}
// This function is only called in steady state replication.
@@ -1084,7 +1079,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
oplogEntriesIterator != oplogEntryPointers->end();
++oplogEntriesIterator) {
auto entry = *oplogEntriesIterator;
- if (entry->opType[0] == 'i' && !entry->isForCappedCollection &&
+ if (entry->getOpType()[0] == 'i' && !entry->isForCappedCollection &&
oplogEntriesIterator > doNotGroupBeforePoint) {
// Attempt to group inserts if possible.
std::vector<BSONObj> toInsert;
@@ -1094,10 +1089,11 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
oplogEntriesIterator + 1,
oplogEntryPointers->end(),
[&](const OplogEntry* nextEntry) {
- return nextEntry->opType[0] != 'i' || // Must be an insert.
- nextEntry->ns != entry->ns || // Must be the same namespace.
+ return nextEntry->getOpType()[0] != 'i' || // Must be an insert.
+ nextEntry->getNamespace() !=
+ entry->getNamespace() || // Must be the same namespace.
// Must not create too large an object.
- (batchSize += nextEntry->o.Obj().objsize()) > insertVectorMaxBytes ||
+ (batchSize += nextEntry->getObject().objsize()) > insertVectorMaxBytes ||
++batchCount >= 64; // Or have too many entries.
});
@@ -1117,7 +1113,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
for (auto groupingIterator = oplogEntriesIterator;
groupingIterator != endOfGroupableOpsIterator;
++groupingIterator) {
- insertArrayBuilder.append((*groupingIterator)->o.Obj());
+ insertArrayBuilder.append((*groupingIterator)->getObject());
}
insertArrayBuilder.done();
@@ -1280,7 +1276,7 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
ON_BLOCK_EXIT([&] { workerPool->join(); });
- storage->setOplogDeleteFromPoint(opCtx, ops.front().ts.timestamp());
+ storage->setOplogDeleteFromPoint(opCtx, ops.front().getTimestamp());
scheduleWritesToOplog(opCtx, workerPool, ops);
fillWriterVectors(opCtx, &ops, &writerVectors);
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index b9853a776e5..9a83faa52d4 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -718,9 +718,9 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin
auto insertOp1b = makeOp(nss1);
auto insertOp2a = makeOp(nss2);
auto insertOp2b = makeOp(nss2);
- MultiApplier::Operations operationsApplied;
+ std::vector<BSONObj> operationsApplied;
auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(OplogEntry(op));
+ operationsApplied.push_back(op.copy());
return Status::OK();
};
@@ -729,26 +729,26 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin
ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply));
ASSERT_EQUALS(4U, operationsApplied.size());
- ASSERT_EQUALS(createOp1, operationsApplied[0]);
- ASSERT_EQUALS(createOp2, operationsApplied[1]);
+ ASSERT_BSONOBJ_EQ(createOp1.raw, operationsApplied[0]);
+ ASSERT_BSONOBJ_EQ(createOp2.raw, operationsApplied[1]);
// Check grouped insert operations in namespace "nss1".
- ASSERT_EQUALS(insertOp1a.getOpTime(), operationsApplied[2].getOpTime());
- ASSERT_EQUALS(insertOp1a.ns, operationsApplied[2].ns);
- ASSERT_EQUALS(BSONType::Array, operationsApplied[2].o.type());
- auto group1 = operationsApplied[2].o.Array();
+ ASSERT_EQUALS(insertOp1a.getOpTime(), OpTime::parseFromOplogEntry(operationsApplied[2]));
+ ASSERT_EQUALS(insertOp1a.getNamespace().ns(), operationsApplied[2]["ns"].valuestrsafe());
+ ASSERT_EQUALS(BSONType::Array, operationsApplied[2]["o"].type());
+ auto group1 = operationsApplied[2]["o"].Array();
ASSERT_EQUALS(2U, group1.size());
- ASSERT_BSONOBJ_EQ(insertOp1a.o.Obj(), group1[0].Obj());
- ASSERT_BSONOBJ_EQ(insertOp1b.o.Obj(), group1[1].Obj());
+ ASSERT_BSONOBJ_EQ(insertOp1a.getObject(), group1[0].Obj());
+ ASSERT_BSONOBJ_EQ(insertOp1b.getObject(), group1[1].Obj());
// Check grouped insert operations in namespace "nss2".
- ASSERT_EQUALS(insertOp2a.getOpTime(), operationsApplied[3].getOpTime());
- ASSERT_EQUALS(insertOp2a.ns, operationsApplied[3].ns);
- ASSERT_EQUALS(BSONType::Array, operationsApplied[3].o.type());
- auto group2 = operationsApplied[3].o.Array();
+ ASSERT_EQUALS(insertOp2a.getOpTime(), OpTime::parseFromOplogEntry(operationsApplied[3]));
+ ASSERT_EQUALS(insertOp2a.getNamespace().ns(), operationsApplied[3]["ns"].valuestrsafe());
+ ASSERT_EQUALS(BSONType::Array, operationsApplied[3]["o"].type());
+ auto group2 = operationsApplied[3]["o"].Array();
ASSERT_EQUALS(2U, group2.size());
- ASSERT_BSONOBJ_EQ(insertOp2a.o.Obj(), group2[0].Obj());
- ASSERT_BSONOBJ_EQ(insertOp2b.o.Obj(), group2[1].Obj());
+ ASSERT_BSONOBJ_EQ(insertOp2a.getObject(), group2[0].Obj());
+ ASSERT_BSONOBJ_EQ(insertOp2b.getObject(), group2[1].Obj());
}
TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) {
@@ -770,9 +770,9 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) {
MultiApplier::Operations operationsToApply;
operationsToApply.push_back(createOp);
std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply));
- MultiApplier::Operations operationsApplied;
+ std::vector<BSONObj> operationsApplied;
auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) {
- operationsApplied.push_back(OplogEntry(op));
+ operationsApplied.push_back(op.copy());
return Status::OK();
};
@@ -785,21 +785,21 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) {
// multiSyncApply should combine operations as follows:
// {create}, {grouped_insert}, {insert_(limit+1)}
ASSERT_EQUALS(3U, operationsApplied.size());
- ASSERT_EQUALS(createOp, operationsApplied[0]);
+ ASSERT_BSONOBJ_EQ(createOp.raw, operationsApplied[0]);
const auto& groupedInsertOp = operationsApplied[1];
- ASSERT_EQUALS(insertOps.front().getOpTime(), groupedInsertOp.getOpTime());
- ASSERT_EQUALS(insertOps.front().ns, groupedInsertOp.ns);
- ASSERT_EQUALS(BSONType::Array, groupedInsertOp.o.type());
- auto groupedInsertDocuments = groupedInsertOp.o.Array();
+ ASSERT_EQUALS(insertOps.front().getOpTime(), OpTime::parseFromOplogEntry(groupedInsertOp));
+ ASSERT_EQUALS(insertOps.front().getNamespace().ns(), groupedInsertOp["ns"].valuestrsafe());
+ ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type());
+ auto groupedInsertDocuments = groupedInsertOp["o"].Array();
ASSERT_EQUALS(limit, groupedInsertDocuments.size());
for (std::size_t i = 0; i < limit; ++i) {
const auto& insertOp = insertOps[i];
- ASSERT_BSONOBJ_EQ(insertOp.o.Obj(), groupedInsertDocuments[i].Obj());
+ ASSERT_BSONOBJ_EQ(insertOp.getObject(), groupedInsertDocuments[i].Obj());
}
// (limit + 1)-th insert operations should not be included in group of first (limit) inserts.
- ASSERT_EQUALS(insertOps.back(), operationsApplied[2]);
+ ASSERT_BSONOBJ_EQ(insertOps.back().raw, operationsApplied[2]);
}
TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) {