diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 59 |
1 files changed, 4 insertions, 55 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index fd54b52aa40..b79f3540343 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1445,62 +1445,11 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() { return Operations(); } - std::uint32_t totalBytes = 0; - Operations ops; - BSONObj op; - - // Return a new batch of ops to apply. - // A batch may consist of: - // * at most "replBatchLimitOperations" OplogEntries - // * at most "replBatchLimitBytes" worth of OplogEntries - // * only OplogEntries from before the slaveDelay point - // * a single command OplogEntry (including index builds, which appear to be inserts) - // * consequently, commands bound the previous batch to be in a batch of their own + // Access common batching logic in OplogApplier using passthrough function in + // DataReplicatorExternalState. auto opCtx = makeOpCtx(); - while (_oplogBuffer->peek(opCtx.get(), &op)) { - auto entry = OplogEntry(op); - - // Check for oplog version change. If it is absent, its value is one. - if (entry.getVersion() != OplogEntry::kOplogVersion) { - std::string message = str::stream() - << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " - << entry.getVersion() << " in oplog entry: " << redact(entry.toBSON()); - severe() << message; - return {ErrorCodes::BadValue, message}; - } - - // Commands must be processed one at a time. The only exception to this is applyOps because - // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is - // safe to batch applyOps commands with CRUD operations when reading from the oplog buffer. - if (entry.isCommand() && entry.getCommandType() != OplogEntry::CommandType::kApplyOps) { - if (ops.empty()) { - // Apply commands one-at-a-time. - ops.push_back(std::move(entry)); - BSONObj opToPopAndDiscard; - invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard)); - dassert(ops.back() == OplogEntry(opToPopAndDiscard)); - } - - // Otherwise, apply what we have so far and come back for the command. - return std::move(ops); - } - - // Apply replication batch limits. - if (ops.size() >= _opts.batchLimits.ops) { - return std::move(ops); - } - if (totalBytes + entry.getRawObjSizeBytes() >= _opts.batchLimits.bytes) { - return std::move(ops); - } - - // Add op to buffer. - ops.push_back(std::move(entry)); - totalBytes += entry.getRawObjSizeBytes(); - BSONObj opToPopAndDiscard; - invariant(_oplogBuffer->tryPop(opCtx.get(), &opToPopAndDiscard)); - dassert(ops.back() == OplogEntry(opToPopAndDiscard)); - } - return std::move(ops); + return _dataReplicatorExternalState->getNextApplierBatch( + opCtx.get(), _oplogBuffer.get(), _opts.batchLimits); } StatusWith<HostAndPort> InitialSyncer::_chooseSyncSource_inlock() { |