summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/data_replicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/data_replicator.cpp')
-rw-r--r--src/mongo/db/repl/data_replicator.cpp89
1 files changed, 81 insertions, 8 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 3554a86476f..feb54538c08 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -54,7 +54,6 @@
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/queue.h"
#include "mongo/util/scopeguard.h"
-#include "mongo/util/stacktrace.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -529,6 +528,7 @@ DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor*
uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid setFollowerMode function", _opts.setFollowerMode);
+ uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay);
uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector);
}
@@ -1025,13 +1025,75 @@ void DataReplicator::_doNextActions_Steady_inlock() {
}
}
-Operations DataReplicator::_getNextApplierBatch_inlock() {
- // Return a new batch of ops to apply.
- // TODO: limit the batch like SyncTail::tryPopAndWaitForMore
+StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
+ const int slaveDelaySecs = durationCount<Seconds>(_opts.getSlaveDelay());
+ const unsigned int slaveDelayBoundary = static_cast<unsigned int>(time(0) - slaveDelaySecs);
+
+ size_t totalBytes = 0;
Operations ops;
BSONObj op;
- while (_oplogBuffer.tryPop(op)) {
- ops.push_back(OplogEntry(op));
+
+ // Return a new batch of ops to apply.
+ // A batch may consist of:
+ // * at most "replBatchLimitOperations" OplogEntries
+ // * at most "replBatchLimitBytes" worth of OplogEntries
+ // * only OplogEntries from before the slaveDelay point
+ // * a single command OplogEntry (including index builds, which appear to be inserts)
+ // * consequently, commands bound the previous batch to be in a batch of their own
+ while (_oplogBuffer.peek(op)) {
+ auto entry = OplogEntry(op);
+
+ // Check for ops that must be processed one at a time.
+ if (entry.isCommand() ||
+ // Index builds are achieved through the use of an insert op, not a command op.
+ // The following line is the same as what the insert code uses to detect an index build.
+ (entry.hasNamespace() && entry.getCollectionName() == "system.indexes")) {
+ if (ops.empty()) {
+ // Apply commands one-at-a-time.
+ ops.push_back(std::move(entry));
+ _oplogBuffer.tryPop(op);
+ invariant(entry == OplogEntry(op));
+ }
+
+ // Otherwise, apply what we have so far and come back for the command.
+ return ops;
+ }
+
+ // Check for oplog version change. If it is absent, its value is one.
+ if (entry.getVersion() != OplogEntry::kOplogVersion) {
+ std::string message = str::stream()
+ << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
+ << entry.getVersion() << " in oplog entry: " << entry.raw;
+ severe() << message;
+ return {ErrorCodes::BadValue, message};
+ }
+
+ // Apply replication batch limits.
+ if (ops.size() >= _opts.replBatchLimitOperations) {
+ return ops;
+ }
+ if (totalBytes + entry.raw.objsize() > _opts.replBatchLimitBytes) {
+ return ops;
+ }
+
+ // Check slaveDelay boundary.
+ if (slaveDelaySecs > 0) {
+ const unsigned int opTimestampSecs = op["ts"].timestamp().getSecs();
+
+ // Stop the batch as the lastOp is too new to be applied. If we continue
+ // on, we can get ops that are way ahead of the delay and this will
+ // make this thread sleep longer when handleSlaveDelay is called
+ // and apply ops much sooner than we like.
+ if (opTimestampSecs > slaveDelayBoundary) {
+ return ops;
+ }
+ }
+
+ // Add op to buffer.
+ ops.push_back(entry);
+ totalBytes += entry.raw.objsize();
+ _oplogBuffer.tryPop(op);
+ invariant(entry == OplogEntry(op));
}
return ops;
}
@@ -1154,8 +1216,19 @@ Status DataReplicator::_scheduleApplyBatch() {
Status DataReplicator::_scheduleApplyBatch_inlock() {
if (!_applierPaused && !_applierActive) {
_applierActive = true;
- const Operations ops = _getNextApplierBatch_inlock();
- invariant(ops.size());
+ auto batchStatus = _getNextApplierBatch_inlock();
+ if (!batchStatus.isOK()) {
+ return batchStatus.getStatus();
+ }
+ const Operations ops = batchStatus.getValue();
+ if (ops.empty()) {
+ _applierActive = false;
+ auto status = _exec->scheduleWorkAt(_exec->now() + Seconds(1),
+ [this](const CallbackArgs&) { _doNextActions(); });
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ }
invariant(_opts.applierFn);
invariant(!(_applier && _applier->isActive()));
return _scheduleApplyBatch_inlock(ops);