summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp59
1 files changed, 4 insertions, 55 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index fd54b52aa40..b79f3540343 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1445,62 +1445,11 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
return Operations();
}
- std::uint32_t totalBytes = 0;
- Operations ops;
- BSONObj op;
-
- // Return a new batch of ops to apply.
- // A batch may consist of:
- // * at most "replBatchLimitOperations" OplogEntries
- // * at most "replBatchLimitBytes" worth of OplogEntries
- // * only OplogEntries from before the slaveDelay point
- // * a single command OplogEntry (including index builds, which appear to be inserts)
- // * consequently, commands bound the previous batch to be in a batch of their own
+ // Access common batching logic in OplogApplier using passthrough function in
+ // DataReplicatorExternalState.
auto opCtx = makeOpCtx();
- while (_oplogBuffer->peek(opCtx.get(), &op)) {
- auto entry = OplogEntry(op);
-
- // Check for oplog version change. If it is absent, its value is one.
- if (entry.getVersion() != OplogEntry::kOplogVersion) {
- std::string message = str::stream()
- << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
- << entry.getVersion() << " in oplog entry: " << redact(entry.toBSON());
- severe() << message;
- return {ErrorCodes::BadValue, message};
- }
-
- // Commands must be processed one at a time. The only exception to this is applyOps because
- // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is
- // safe to batch applyOps commands with CRUD operations when reading from the oplog buffer.
- if (entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) {
- if (ops.empty()) {
- // Apply commands one-at-a-time.
- ops.push_back(std::move(entry));
- BSONObj opToPopAndDiscard;
- invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard));
- dassert(ops.back() == OplogEntry(opToPopAndDiscard));
- }
-
- // Otherwise, apply what we have so far and come back for the command.
- return std::move(ops);
- }
-
- // Apply replication batch limits.
- if (ops.size() >= _opts.batchLimits.ops) {
- return std::move(ops);
- }
- if (totalBytes + entry.getRawObjSizeBytes() >= _opts.batchLimits.bytes) {
- return std::move(ops);
- }
-
- // Add op to buffer.
- ops.push_back(std::move(entry));
- totalBytes += entry.getRawObjSizeBytes();
- BSONObj opToPopAndDiscard;
- invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard));
- dassert(ops.back() == OplogEntry(opToPopAndDiscard));
- }
- return std::move(ops);
+ return _dataReplicatorExternalState->getNextApplierBatch(
+ opCtx.get(), _oplogBuffer.get(), _opts.batchLimits);
}
StatusWith<HostAndPort> InitialSyncer::_chooseSyncSource_inlock() {