diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 2 |
4 files changed, 23 insertions, 19 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index c08fe1c93b1..59e7908c916 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -932,15 +932,8 @@ void InitialSyncer::_getNextApplierBatchCallback( MultiApplier::ApplyOperationFn apply) { return _dataReplicatorExternalState->_multiApply(opCtx, ops, apply); }; - const auto lastEntry = ops.back().raw; - const auto opTimeWithHashStatus = AbstractOplogFetcher::parseOpTimeWithHash(lastEntry); - status = opTimeWithHashStatus.getStatus(); - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - } - - auto lastApplied = opTimeWithHashStatus.getValue(); + const auto& lastEntry = ops.back(); + OpTimeWithHash lastApplied(lastEntry.getHash(), lastEntry.getOpTime()); auto numApplied = ops.size(); MultiApplier::CallbackFn onCompletionFn = [=](const Status& s) { return _multiApplierCallback(s, lastApplied, numApplied, onCompletionGuard); @@ -1456,13 +1449,13 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() { // * consequently, commands bound the previous batch to be in a batch of their own auto opCtx = makeOpCtx(); while (_oplogBuffer->peek(opCtx.get(), &op)) { - auto entry = OplogEntry(std::move(op)); + auto entry = OplogEntry(op); // Check for oplog version change. If it is absent, its value is one. if (entry.getVersion() != OplogEntry::kOplogVersion) { std::string message = str::stream() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " - << entry.getVersion() << " in oplog entry: " << redact(entry.raw); + << entry.getVersion() << " in oplog entry: " << redact(entry.toBSON()); severe() << message; return {ErrorCodes::BadValue, message}; } @@ -1476,8 +1469,9 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() { if (ops.empty()) { // Apply commands one-at-a-time. ops.push_back(std::move(entry)); - invariant(_oplogBuffer->tryPop(opCtx.get(), &op)); - dassert(SimpleBSONObjComparator::kInstance.evaluate(ops.back().raw == op)); + BSONObj opToPopAndDiscard; + invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard)); + dassert(ops.back() == OplogEntry(opToPopAndDiscard)); } // Otherwise, apply what we have so far and come back for the command. @@ -1488,13 +1482,13 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() { if (ops.size() >= _opts.replBatchLimitOperations) { return std::move(ops); } - if (totalBytes + entry.raw.objsize() >= _opts.replBatchLimitBytes) { + if (totalBytes + entry.getRawObjSizeBytes() >= _opts.replBatchLimitBytes) { return std::move(ops); } // Check slaveDelay boundary. if (slaveDelaySecs > 0) { - const unsigned int opTimestampSecs = op["ts"].timestamp().getSecs(); + const auto opTimestampSecs = entry.getTimestamp().getSecs(); const unsigned int slaveDelayBoundary = static_cast<unsigned int>(time(0) - slaveDelaySecs); @@ -1509,9 +1503,10 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() { // Add op to buffer. ops.push_back(std::move(entry)); - totalBytes += ops.back().raw.objsize(); - invariant(_oplogBuffer->tryPop(opCtx.get(), &op)); - dassert(SimpleBSONObjComparator::kInstance.evaluate(ops.back().raw == op)); + totalBytes += entry.getRawObjSizeBytes(); + BSONObj opToPopAndDiscard; + invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard)); + dassert(ops.back() == OplogEntry(opToPopAndDiscard)); } return std::move(ops); } diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index dcc24a1c114..f9f4a11896f 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -220,6 +220,10 @@ OplogEntry::CommandType OplogEntry::getCommandType() const { return _commandType; } +int OplogEntry::getRawObjSizeBytes() const { + return raw.objsize(); +} + OpTime OplogEntry::getOpTime() const { long long term = OpTime::kUninitializedTerm; if (getTerm()) { diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 985384e4ecf..ae21d23b446 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -108,6 +108,11 @@ public: CommandType getCommandType() const; /** + * Returns the size of the original document used to create this OplogEntry. + */ + int getRawObjSizeBytes() const; + + /** * Returns the OpTime of the oplog entry. */ OpTime getOpTime() const; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index b3ae1fe4a5b..e082f5cb52e 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -149,7 +149,7 @@ public: _batch.emplace_back(std::move(obj)); } void pop_back() { - _bytes -= back().raw.objsize(); + _bytes -= back().getRawObjSizeBytes(); _batch.pop_back(); } |