diff options
Diffstat (limited to 'src/mongo/db/repl/data_replicator.cpp')
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 89 |
1 files changed, 81 insertions, 8 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 3554a86476f..feb54538c08 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -54,7 +54,6 @@ #include "mongo/util/mongoutils/str.h" #include "mongo/util/queue.h" #include "mongo/util/scopeguard.h" -#include "mongo/util/stacktrace.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" @@ -529,6 +528,7 @@ DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime); uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime); uassert(ErrorCodes::BadValue, "invalid setFollowerMode function", _opts.setFollowerMode); + uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay); uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector); } @@ -1025,13 +1025,75 @@ void DataReplicator::_doNextActions_Steady_inlock() { } } -Operations DataReplicator::_getNextApplierBatch_inlock() { - // Return a new batch of ops to apply. - // TODO: limit the batch like SyncTail::tryPopAndWaitForMore +StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { + const int slaveDelaySecs = durationCount<Seconds>(_opts.getSlaveDelay()); + const unsigned int slaveDelayBoundary = static_cast<unsigned int>(time(0) - slaveDelaySecs); + + size_t totalBytes = 0; Operations ops; BSONObj op; - while (_oplogBuffer.tryPop(op)) { - ops.push_back(OplogEntry(op)); + + // Return a new batch of ops to apply. + // A batch may consist of: + // * at most "replBatchLimitOperations" OplogEntries + // * at most "replBatchLimitBytes" worth of OplogEntries + // * only OplogEntries from before the slaveDelay point + // * a single command OplogEntry (including index builds, which appear to be inserts) + // * consequently, commands bound the previous batch to be in a batch of their own + while (_oplogBuffer.peek(op)) { + auto entry = OplogEntry(op); + + // Check for ops that must be processed one at a time. + if (entry.isCommand() || + // Index builds are achieved through the use of an insert op, not a command op. + // The following line is the same as what the insert code uses to detect an index build. + (entry.hasNamespace() && entry.getCollectionName() == "system.indexes")) { + if (ops.empty()) { + // Apply commands one-at-a-time. + ops.push_back(std::move(entry)); + _oplogBuffer.tryPop(op); + invariant(entry == OplogEntry(op)); + } + + // Otherwise, apply what we have so far and come back for the command. + return ops; + } + + // Check for oplog version change. If it is absent, its value is one. + if (entry.getVersion() != OplogEntry::kOplogVersion) { + std::string message = str::stream() + << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " + << entry.getVersion() << " in oplog entry: " << entry.raw; + severe() << message; + return {ErrorCodes::BadValue, message}; + } + + // Apply replication batch limits. + if (ops.size() >= _opts.replBatchLimitOperations) { + return ops; + } + if (totalBytes + entry.raw.objsize() > _opts.replBatchLimitBytes) { + return ops; + } + + // Check slaveDelay boundary. + if (slaveDelaySecs > 0) { + const unsigned int opTimestampSecs = op["ts"].timestamp().getSecs(); + + // Stop the batch as the lastOp is too new to be applied. If we continue + // on, we can get ops that are way ahead of the delay and this will + // make this thread sleep longer when handleSlaveDelay is called + // and apply ops much sooner than we like. + if (opTimestampSecs > slaveDelayBoundary) { + return ops; + } + } + + // Add op to buffer. + ops.push_back(entry); + totalBytes += entry.raw.objsize(); + _oplogBuffer.tryPop(op); + invariant(entry == OplogEntry(op)); } return ops; } @@ -1154,8 +1216,19 @@ Status DataReplicator::_scheduleApplyBatch() { Status DataReplicator::_scheduleApplyBatch_inlock() { if (!_applierPaused && !_applierActive) { _applierActive = true; - const Operations ops = _getNextApplierBatch_inlock(); - invariant(ops.size()); + auto batchStatus = _getNextApplierBatch_inlock(); + if (!batchStatus.isOK()) { + return batchStatus.getStatus(); + } + const Operations ops = batchStatus.getValue(); + if (ops.empty()) { + _applierActive = false; + auto status = _exec->scheduleWorkAt(_exec->now() + Seconds(1), + [this](const CallbackArgs&) { _doNextActions(); }); + if (!status.isOK()) { + return status.getStatus(); + } + } invariant(_opts.applierFn); invariant(!(_applier && _applier->isActive())); return _scheduleApplyBatch_inlock(ops); |