summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2016-03-09 10:31:47 -0500
committermatt dannenberg <matt.dannenberg@10gen.com>2016-03-15 04:50:53 -0400
commitb8dec1f8d1849ab61b5075821f3ab4343aeecc7a (patch)
treebaa41354b935863bda0546c342b40cd1b8a7a649
parentbd26c0dbb66ec1ea46b7fc891dde6f7f8a166d50 (diff)
downloadmongo-b8dec1f8d1849ab61b5075821f3ab4343aeecc7a.tar.gz
SERVER-22858 generate batches in DataReplicator using the TryPopAndWaitForMore logic
-rw-r--r--src/mongo/db/repl/SConscript11
-rw-r--r--src/mongo/db/repl/data_replicator.cpp89
-rw-r--r--src/mongo/db/repl/data_replicator.h10
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp82
-rw-r--r--src/mongo/db/repl/oplog.cpp3
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp26
-rw-r--r--src/mongo/db/repl/oplog_entry.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp4
9 files changed, 183 insertions, 56 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 3c99b2d5f4c..84908049939 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -206,10 +206,16 @@ env.Library(
)
env.Library(
+ target='oplog_entry',
+ source=[
+ 'oplog_entry.cpp',
+ ],
+)
+
+env.Library(
target='sync_tail',
source=[
'sync_tail.cpp',
- 'oplog_entry.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
@@ -217,6 +223,7 @@ env.Library(
'$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ 'oplog_entry',
'repl_coordinator_global',
],
LIBDEPS_TAGS=[
@@ -673,9 +680,9 @@ env.Library(
target='applier',
source=[
'applier.cpp',
- 'oplog_entry.cpp',
],
LIBDEPS=[
+ 'oplog_entry',
'replication_executor',
],
)
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 3554a86476f..feb54538c08 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -54,7 +54,6 @@
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/queue.h"
#include "mongo/util/scopeguard.h"
-#include "mongo/util/stacktrace.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -529,6 +528,7 @@ DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor*
uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime);
uassert(ErrorCodes::BadValue, "invalid setFollowerMode function", _opts.setFollowerMode);
+ uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay);
uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector);
}
@@ -1025,13 +1025,75 @@ void DataReplicator::_doNextActions_Steady_inlock() {
}
}
-Operations DataReplicator::_getNextApplierBatch_inlock() {
- // Return a new batch of ops to apply.
- // TODO: limit the batch like SyncTail::tryPopAndWaitForMore
+StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
+ const int slaveDelaySecs = durationCount<Seconds>(_opts.getSlaveDelay());
+ const unsigned int slaveDelayBoundary = static_cast<unsigned int>(time(0) - slaveDelaySecs);
+
+ size_t totalBytes = 0;
Operations ops;
BSONObj op;
- while (_oplogBuffer.tryPop(op)) {
- ops.push_back(OplogEntry(op));
+
+ // Return a new batch of ops to apply.
+ // A batch may consist of:
+ // * at most "replBatchLimitOperations" OplogEntries
+ // * at most "replBatchLimitBytes" worth of OplogEntries
+ // * only OplogEntries from before the slaveDelay point
+ // * a single command OplogEntry (including index builds, which appear to be inserts)
+ // * consequently, commands bound the previous batch to be in a batch of their own
+ while (_oplogBuffer.peek(op)) {
+ auto entry = OplogEntry(op);
+
+ // Check for ops that must be processed one at a time.
+ if (entry.isCommand() ||
+ // Index builds are achieved through the use of an insert op, not a command op.
+ // The following line is the same as what the insert code uses to detect an index build.
+ (entry.hasNamespace() && entry.getCollectionName() == "system.indexes")) {
+ if (ops.empty()) {
+ // Apply commands one-at-a-time.
+ ops.push_back(std::move(entry));
+ _oplogBuffer.tryPop(op);
+ invariant(entry == OplogEntry(op));
+ }
+
+ // Otherwise, apply what we have so far and come back for the command.
+ return ops;
+ }
+
+ // Check for oplog version change. If it is absent, its value is one.
+ if (entry.getVersion() != OplogEntry::kOplogVersion) {
+ std::string message = str::stream()
+ << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
+ << entry.getVersion() << " in oplog entry: " << entry.raw;
+ severe() << message;
+ return {ErrorCodes::BadValue, message};
+ }
+
+ // Apply replication batch limits.
+ if (ops.size() >= _opts.replBatchLimitOperations) {
+ return ops;
+ }
+ if (totalBytes + entry.raw.objsize() > _opts.replBatchLimitBytes) {
+ return ops;
+ }
+
+ // Check slaveDelay boundary.
+ if (slaveDelaySecs > 0) {
+ const unsigned int opTimestampSecs = op["ts"].timestamp().getSecs();
+
+ // Stop the batch as the lastOp is too new to be applied. If we continue
+ // on, we can get ops that are way ahead of the delay and this will
+ // make this thread sleep longer when handleSlaveDelay is called
+ // and apply ops much sooner than we like.
+ if (opTimestampSecs > slaveDelayBoundary) {
+ return ops;
+ }
+ }
+
+ // Add op to buffer.
+ ops.push_back(entry);
+ totalBytes += entry.raw.objsize();
+ _oplogBuffer.tryPop(op);
+ invariant(entry == OplogEntry(op));
}
return ops;
}
@@ -1154,8 +1216,19 @@ Status DataReplicator::_scheduleApplyBatch() {
Status DataReplicator::_scheduleApplyBatch_inlock() {
if (!_applierPaused && !_applierActive) {
_applierActive = true;
- const Operations ops = _getNextApplierBatch_inlock();
- invariant(ops.size());
+ auto batchStatus = _getNextApplierBatch_inlock();
+ if (!batchStatus.isOK()) {
+ return batchStatus.getStatus();
+ }
+ const Operations ops = batchStatus.getValue();
+ if (ops.empty()) {
+ _applierActive = false;
+ auto status = _exec->scheduleWorkAt(_exec->now() + Seconds(1),
+ [this](const CallbackArgs&) { _doNextActions(); });
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ }
invariant(_opts.applierFn);
invariant(!(_applier && _applier->isActive()));
return _scheduleApplyBatch_inlock(ops);
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 6d7c45d2286..f6503abb8bf 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -110,12 +110,19 @@ struct DataReplicatorOptions {
/** Function to sets this node into a specific follower mode. */
using SetFollowerModeFn = stdx::function<bool(const MemberState&)>;
+ /** Function to get this node's slaveDelay. */
+ using GetSlaveDelayFn = stdx::function<Seconds()>;
+
// Error and retry values
Milliseconds syncSourceRetryWait{1000};
Milliseconds initialSyncRetryWait{1000};
Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10};
Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10};
+ // Batching settings.
+ size_t replBatchLimitBytes = 512 * 1024 * 1024;
+ size_t replBatchLimitOperations = 5000;
+
// Replication settings
NamespaceString localOplogNS = NamespaceString("local.oplog.rs");
NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs");
@@ -131,6 +138,7 @@ struct DataReplicatorOptions {
GetMyLastOptimeFn getMyLastOptime;
SetMyLastOptimeFn setMyLastOptime;
SetFollowerModeFn setFollowerMode;
+ GetSlaveDelayFn getSlaveDelay;
SyncSourceSelector* syncSourceSelector = nullptr;
std::string toString() const {
@@ -230,7 +238,7 @@ private:
Timestamp _applyUntil(Timestamp);
void _pauseApplier();
- Operations _getNextApplierBatch_inlock();
+ StatusWith<Operations> _getNextApplierBatch_inlock();
void _onApplyBatchFinish(const CallbackArgs&,
const TimestampStatus&,
const Operations&,
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 27dd810716c..b8199ebf9dd 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -199,6 +199,7 @@ protected:
_memberState = state;
return true;
};
+ options.getSlaveDelay = [this]() { return Seconds(0); };
options.syncSourceSelector = this;
try {
_dr.reset(new DataReplicator(options, &(getReplExecutor())));
@@ -332,12 +333,11 @@ protected:
const long long cursorId = cmdElem.numberLong();
if (isGetMore && cursorId == 1LL) {
// process getmore requests from the oplog fetcher
- auto respBSON =
- fromjson(str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'"
- " , nextBatch:[{ts:Timestamp(" << ++c
- << ",1), h:1, ns:'test.a', v:2, op:'u', o2:{_id:" << c
- << "}, o:{$set:{a:1}}}"
- "]}}");
+ auto respBSON = fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'"
+ " , nextBatch:[{ts:Timestamp(" << ++c
+ << ",1), h:1, ns:'test.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'u', o2:{_id:" << c << "}, o:{$set:{a:1}}}]}}");
net->scheduleResponse(
noi,
net->now(),
@@ -412,14 +412,14 @@ TEST_F(InitialSyncTest, Complete) {
const std::vector<BSONObj> responses = {
// get latest oplog ts
fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
// oplog fetcher find
fromjson(
- "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
// Clone Start
// listDatabases
fromjson("{ok:1, databases:[{name:'a'}]}"),
@@ -429,10 +429,10 @@ TEST_F(InitialSyncTest, Complete) {
"{name:'a', options:{}} "
"]}}"),
// listIndexes:a
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}"
- "]}}"),
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:" << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
// find:a
fromjson(
"{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
@@ -441,9 +441,9 @@ TEST_F(InitialSyncTest, Complete) {
// Clone Done
// get latest oplog ts
fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, c:1}}]}}"),
// Applier starts ...
};
startSync();
@@ -465,14 +465,14 @@ TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
const std::vector<BSONObj> responses = {
// get latest oplog ts
fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
// oplog fetcher find
fromjson(
- "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'u', o2:{_id:1}, o:{$set:{a:1}}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'u', o2:{_id:1}, o:{$set:{a:1}}}]}}"),
// Clone Start
// listDatabases
fromjson("{ok:1, databases:[{name:'a'}]}"),
@@ -482,18 +482,18 @@ TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
"{name:'a', options:{}} "
"]}}"),
// listIndexes:a
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}"
- "]}}"),
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:" << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
// find:a -- empty
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"),
// Clone Done
// get latest oplog ts
fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, c:1}}]}}"),
// Applier starts ...
// missing doc fetch -- find:a {_id:1}
fromjson(
@@ -541,14 +541,14 @@ TEST_F(InitialSyncTest, FailsOnClone) {
const std::vector<BSONObj> responses = {
// get latest oplog ts
fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
// oplog fetcher find
fromjson(
- "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}"
- "]}}"),
+ str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
// Clone Start
// listDatabases
fromjson("{ok:0}")};
@@ -864,7 +864,8 @@ TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) {
TEST_F(SteadyStateTest, PauseDataReplicator) {
auto operationToApply = BSON("op"
<< "a"
- << "ts" << Timestamp(Seconds(123), 0));
+ << "v" << OplogEntry::kOplogVersion << "ts"
+ << Timestamp(Seconds(123), 0));
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
@@ -945,7 +946,8 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
TEST_F(SteadyStateTest, ApplyOneOperation) {
auto operationToApply = BSON("op"
<< "a"
- << "ts" << Timestamp(Seconds(123), 0));
+ << "v" << OplogEntry::kOplogVersion << "ts"
+ << Timestamp(Seconds(123), 0));
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index dcada5fd460..72cd946e601 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -105,7 +105,6 @@ using std::vector;
namespace repl {
std::string rsOplogName = "local.oplog.rs";
std::string masterSlaveOplogName = "local.oplog.$main";
-int OPLOG_VERSION = 2;
MONGO_FP_DECLARE(disableSnapshotting);
@@ -292,7 +291,7 @@ unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn,
if (optime.getTerm() != -1)
b.append("t", optime.getTerm());
b.append("h", hashNew);
- b.append("v", OPLOG_VERSION);
+ b.append("v", OplogEntry::kOplogVersion);
b.append("op", opstr);
b.append("ns", nss.ns());
if (fromMigrate)
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 5c9922b3873..75bba6782b4 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -32,9 +32,13 @@
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/namespace_string.h"
+
namespace mongo {
namespace repl {
+const int OplogEntry::kOplogVersion = 2;
+
OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
for (auto elem : raw) {
const auto name = elem.fieldNameStringData();
@@ -44,6 +48,8 @@ OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
opType = elem.valuestrsafe();
} else if (name == "o2") {
o2 = elem;
+ } else if (name == "ts") {
+ ts = elem;
} else if (name == "v") {
version = elem;
} else if (name == "o") {
@@ -52,6 +58,26 @@ OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
}
}
+bool OplogEntry::isCommand() const {
+ return opType[0] == 'c';
+}
+
+bool OplogEntry::hasNamespace() const {
+ return !ns.empty();
+}
+
+int OplogEntry::getVersion() const {
+ return version.eoo() ? 1 : version.Int();
+}
+
+Seconds OplogEntry::getTimestampSecs() const {
+ return Seconds(ts.timestamp().getSecs());
+}
+
+StringData OplogEntry::getCollectionName() const {
+ return nsToCollectionSubstring(ns);
+}
+
std::string OplogEntry::toString() const {
return raw.toString();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 4172c9659d5..08fd1c8d95a 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -32,6 +32,7 @@
namespace mongo {
namespace repl {
+
/**
* A parsed oplog entry.
*
@@ -42,14 +43,21 @@ namespace repl {
* All StringData members are guaranteed to be NUL terminated.
*/
struct OplogEntry {
+ // Current oplog version, should be the value of the v field in all oplog entries.
+ static const int kOplogVersion;
+
explicit OplogEntry(const BSONObj& raw);
// This member is not parsed from the BSON and is instead populated by fillWriterVectors.
bool isForCappedCollection = false;
+ bool isCommand() const;
+ bool hasNamespace() const;
+ int getVersion() const;
+ Seconds getTimestampSecs() const;
+ StringData getCollectionName() const;
std::string toString() const;
-
BSONObj raw; // Owned.
StringData ns = "";
@@ -58,6 +66,7 @@ struct OplogEntry {
BSONElement version;
BSONElement o;
BSONElement o2;
+ BSONElement ts;
};
std::ostream& operator<<(std::ostream& s, const OplogEntry& o);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 32f0fabb098..f158120254a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -65,6 +65,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/connection_pool_stats.h"
@@ -194,7 +195,9 @@ DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCo
[replCoord](const OpTime& opTime) { replCoord->setMyLastAppliedOpTime(opTime); };
options.setFollowerMode =
[replCoord](const MemberState& newState) { return replCoord->setFollowerMode(newState); };
+ options.getSlaveDelay = [replCoord]() { return replCoord->getSlaveDelaySecs(); };
options.syncSourceSelector = replCoord;
+ options.replBatchLimitBytes = dur::UncommittedBytesLimit;
return options;
}
} // namespace
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 95cd8624fea..cdee4896fab 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -882,8 +882,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
else
curVersion = entry.version.Int();
- if (curVersion != OPLOG_VERSION) {
- severe() << "expected oplog version " << OPLOG_VERSION << " but found version "
+ if (curVersion != OplogEntry::kOplogVersion) {
+ severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
<< curVersion << " in oplog entry: " << op;
fassertFailedNoTrace(18820);
}