summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-10-30 15:49:49 -0400
committerBenety Goh <benety@mongodb.com>2018-01-16 13:11:50 -0500
commit1378f8ec42068913c5bdc5927bbe86d5aed08814 (patch)
tree4f979a14496d8953868c39df7291dc9e6434a0b3 /src
parentb5ebe8a5492c4f5e33970c0f885b9ac51460b9dc (diff)
downloadmongo-1378f8ec42068913c5bdc5927bbe86d5aed08814.tar.gz
SERVER-29200 add OplogEntry::getRawObjSizeBytes()
remove references to OplogEntry::raw from SyncTail::OpQueue and InitialSyncer
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp31
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp4
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/sync_tail.h2
4 files changed, 23 insertions, 19 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index c08fe1c93b1..59e7908c916 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -932,15 +932,8 @@ void InitialSyncer::_getNextApplierBatchCallback(
MultiApplier::ApplyOperationFn apply) {
return _dataReplicatorExternalState->_multiApply(opCtx, ops, apply);
};
- const auto lastEntry = ops.back().raw;
- const auto opTimeWithHashStatus = AbstractOplogFetcher::parseOpTimeWithHash(lastEntry);
- status = opTimeWithHashStatus.getStatus();
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- }
-
- auto lastApplied = opTimeWithHashStatus.getValue();
+ const auto& lastEntry = ops.back();
+ OpTimeWithHash lastApplied(lastEntry.getHash(), lastEntry.getOpTime());
auto numApplied = ops.size();
MultiApplier::CallbackFn onCompletionFn = [=](const Status& s) {
return _multiApplierCallback(s, lastApplied, numApplied, onCompletionGuard);
@@ -1456,13 +1449,13 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
// * consequently, commands bound the previous batch to be in a batch of their own
auto opCtx = makeOpCtx();
while (_oplogBuffer->peek(opCtx.get(), &op)) {
- auto entry = OplogEntry(std::move(op));
+ auto entry = OplogEntry(op);
// Check for oplog version change. If it is absent, its value is one.
if (entry.getVersion() != OplogEntry::kOplogVersion) {
std::string message = str::stream()
<< "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
- << entry.getVersion() << " in oplog entry: " << redact(entry.raw);
+ << entry.getVersion() << " in oplog entry: " << redact(entry.toBSON());
severe() << message;
return {ErrorCodes::BadValue, message};
}
@@ -1476,8 +1469,9 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
if (ops.empty()) {
// Apply commands one-at-a-time.
ops.push_back(std::move(entry));
- invariant(_oplogBuffer->tryPop(opCtx.get(), &op));
- dassert(SimpleBSONObjComparator::kInstance.evaluate(ops.back().raw == op));
+ BSONObj opToPopAndDiscard;
+ invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard));
+ dassert(ops.back() == OplogEntry(opToPopAndDiscard));
}
// Otherwise, apply what we have so far and come back for the command.
@@ -1488,13 +1482,13 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
if (ops.size() >= _opts.replBatchLimitOperations) {
return std::move(ops);
}
- if (totalBytes + entry.raw.objsize() >= _opts.replBatchLimitBytes) {
+ if (totalBytes + entry.getRawObjSizeBytes() >= _opts.replBatchLimitBytes) {
return std::move(ops);
}
// Check slaveDelay boundary.
if (slaveDelaySecs > 0) {
- const unsigned int opTimestampSecs = op["ts"].timestamp().getSecs();
+ const auto opTimestampSecs = entry.getTimestamp().getSecs();
const unsigned int slaveDelayBoundary =
static_cast<unsigned int>(time(0) - slaveDelaySecs);
@@ -1509,9 +1503,10 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
// Add op to buffer.
ops.push_back(std::move(entry));
- totalBytes += ops.back().raw.objsize();
- invariant(_oplogBuffer->tryPop(opCtx.get(), &op));
- dassert(SimpleBSONObjComparator::kInstance.evaluate(ops.back().raw == op));
+ totalBytes += entry.getRawObjSizeBytes();
+ BSONObj opToPopAndDiscard;
+ invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard));
+ dassert(ops.back() == OplogEntry(opToPopAndDiscard));
}
return std::move(ops);
}
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index dcc24a1c114..f9f4a11896f 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -220,6 +220,10 @@ OplogEntry::CommandType OplogEntry::getCommandType() const {
return _commandType;
}
+int OplogEntry::getRawObjSizeBytes() const {
+ return raw.objsize();
+}
+
OpTime OplogEntry::getOpTime() const {
long long term = OpTime::kUninitializedTerm;
if (getTerm()) {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 985384e4ecf..ae21d23b446 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -108,6 +108,11 @@ public:
CommandType getCommandType() const;
/**
+ * Returns the size of the original document used to create this OplogEntry.
+ */
+ int getRawObjSizeBytes() const;
+
+ /**
* Returns the OpTime of the oplog entry.
*/
OpTime getOpTime() const;
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index b3ae1fe4a5b..e082f5cb52e 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -149,7 +149,7 @@ public:
_batch.emplace_back(std::move(obj));
}
void pop_back() {
- _bytes -= back().raw.objsize();
+ _bytes -= back().getRawObjSizeBytes();
_batch.pop_back();
}