From b8dec1f8d1849ab61b5075821f3ab4343aeecc7a Mon Sep 17 00:00:00 2001 From: matt dannenberg Date: Wed, 9 Mar 2016 10:31:47 -0500 Subject: SERVER-22858 generate batches in DataReplicator using the TryPopAndWaitForMore logic --- src/mongo/db/repl/SConscript | 11 ++- src/mongo/db/repl/data_replicator.cpp | 89 ++++++++++++++++++++-- src/mongo/db/repl/data_replicator.h | 10 ++- src/mongo/db/repl/data_replicator_test.cpp | 82 ++++++++++---------- src/mongo/db/repl/oplog.cpp | 3 +- src/mongo/db/repl/oplog_entry.cpp | 26 +++++++ src/mongo/db/repl/oplog_entry.h | 11 ++- src/mongo/db/repl/replication_coordinator_impl.cpp | 3 + src/mongo/db/repl/sync_tail.cpp | 4 +- 9 files changed, 183 insertions(+), 56 deletions(-) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 3c99b2d5f4c..84908049939 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -205,11 +205,17 @@ env.Library( ) +env.Library( + target='oplog_entry', + source=[ + 'oplog_entry.cpp', + ], +) + env.Library( target='sync_tail', source=[ 'sync_tail.cpp', - 'oplog_entry.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authorization_manager_global', @@ -217,6 +223,7 @@ env.Library( '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/util/concurrency/thread_pool', + 'oplog_entry', 'repl_coordinator_global', ], LIBDEPS_TAGS=[ @@ -673,9 +680,9 @@ env.Library( target='applier', source=[ 'applier.cpp', - 'oplog_entry.cpp', ], LIBDEPS=[ + 'oplog_entry', 'replication_executor', ], ) diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 3554a86476f..feb54538c08 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -54,7 +54,6 @@ #include "mongo/util/mongoutils/str.h" #include "mongo/util/queue.h" #include "mongo/util/scopeguard.h" -#include "mongo/util/stacktrace.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" @@ -529,6 +528,7 @@ DataReplicator::DataReplicator(DataReplicatorOptions opts, ReplicationExecutor* uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime); uassert(ErrorCodes::BadValue, "invalid setMyLastOptime function", _opts.setMyLastOptime); uassert(ErrorCodes::BadValue, "invalid setFollowerMode function", _opts.setFollowerMode); + uassert(ErrorCodes::BadValue, "invalid getSlaveDelay function", _opts.getSlaveDelay); uassert(ErrorCodes::BadValue, "invalid sync source selector", _opts.syncSourceSelector); } @@ -1025,13 +1025,75 @@ void DataReplicator::_doNextActions_Steady_inlock() { } } -Operations DataReplicator::_getNextApplierBatch_inlock() { - // Return a new batch of ops to apply. - // TODO: limit the batch like SyncTail::tryPopAndWaitForMore +StatusWith DataReplicator::_getNextApplierBatch_inlock() { + const int slaveDelaySecs = durationCount(_opts.getSlaveDelay()); + const unsigned int slaveDelayBoundary = static_cast(time(0) - slaveDelaySecs); + + size_t totalBytes = 0; Operations ops; BSONObj op; - while (_oplogBuffer.tryPop(op)) { - ops.push_back(OplogEntry(op)); + + // Return a new batch of ops to apply. + // A batch may consist of: + // * at most "replBatchLimitOperations" OplogEntries + // * at most "replBatchLimitBytes" worth of OplogEntries + // * only OplogEntries from before the slaveDelay point + // * a single command OplogEntry (including index builds, which appear to be inserts) + // * consequently, commands bound the previous batch to be in a batch of their own + while (_oplogBuffer.peek(op)) { + auto entry = OplogEntry(op); + + // Check for ops that must be processed one at a time. + if (entry.isCommand() || + // Index builds are achieved through the use of an insert op, not a command op. + // The following line is the same as what the insert code uses to detect an index build. + (entry.hasNamespace() && entry.getCollectionName() == "system.indexes")) { + if (ops.empty()) { + // Apply commands one-at-a-time. + ops.push_back(std::move(entry)); + _oplogBuffer.tryPop(op); + invariant(entry == OplogEntry(op)); + } + + // Otherwise, apply what we have so far and come back for the command. + return ops; + } + + // Check for oplog version change. If it is absent, its value is one. + if (entry.getVersion() != OplogEntry::kOplogVersion) { + std::string message = str::stream() + << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " + << entry.getVersion() << " in oplog entry: " << entry.raw; + severe() << message; + return {ErrorCodes::BadValue, message}; + } + + // Apply replication batch limits. + if (ops.size() >= _opts.replBatchLimitOperations) { + return ops; + } + if (totalBytes + entry.raw.objsize() > _opts.replBatchLimitBytes) { + return ops; + } + + // Check slaveDelay boundary. + if (slaveDelaySecs > 0) { + const unsigned int opTimestampSecs = op["ts"].timestamp().getSecs(); + + // Stop the batch as the lastOp is too new to be applied. If we continue + // on, we can get ops that are way ahead of the delay and this will + // make this thread sleep longer when handleSlaveDelay is called + // and apply ops much sooner than we like. + if (opTimestampSecs > slaveDelayBoundary) { + return ops; + } + } + + // Add op to buffer. + ops.push_back(entry); + totalBytes += entry.raw.objsize(); + _oplogBuffer.tryPop(op); + invariant(entry == OplogEntry(op)); } return ops; } @@ -1154,8 +1216,19 @@ Status DataReplicator::_scheduleApplyBatch() { Status DataReplicator::_scheduleApplyBatch_inlock() { if (!_applierPaused && !_applierActive) { _applierActive = true; - const Operations ops = _getNextApplierBatch_inlock(); - invariant(ops.size()); + auto batchStatus = _getNextApplierBatch_inlock(); + if (!batchStatus.isOK()) { + return batchStatus.getStatus(); + } + const Operations ops = batchStatus.getValue(); + if (ops.empty()) { + _applierActive = false; + auto status = _exec->scheduleWorkAt(_exec->now() + Seconds(1), + [this](const CallbackArgs&) { _doNextActions(); }); + if (!status.isOK()) { + return status.getStatus(); + } + } invariant(_opts.applierFn); invariant(!(_applier && _applier->isActive())); return _scheduleApplyBatch_inlock(ops); diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 6d7c45d2286..f6503abb8bf 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -110,12 +110,19 @@ struct DataReplicatorOptions { /** Function to sets this node into a specific follower mode. */ using SetFollowerModeFn = stdx::function; + /** Function to get this node's slaveDelay. */ + using GetSlaveDelayFn = stdx::function; + // Error and retry values Milliseconds syncSourceRetryWait{1000}; Milliseconds initialSyncRetryWait{1000}; Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10}; Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10}; + // Batching settings. + size_t replBatchLimitBytes = 512 * 1024 * 1024; + size_t replBatchLimitOperations = 5000; + // Replication settings NamespaceString localOplogNS = NamespaceString("local.oplog.rs"); NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs"); @@ -131,6 +138,7 @@ struct DataReplicatorOptions { GetMyLastOptimeFn getMyLastOptime; SetMyLastOptimeFn setMyLastOptime; SetFollowerModeFn setFollowerMode; + GetSlaveDelayFn getSlaveDelay; SyncSourceSelector* syncSourceSelector = nullptr; std::string toString() const { @@ -230,7 +238,7 @@ private: Timestamp _applyUntil(Timestamp); void _pauseApplier(); - Operations _getNextApplierBatch_inlock(); + StatusWith _getNextApplierBatch_inlock(); void _onApplyBatchFinish(const CallbackArgs&, const TimestampStatus&, const Operations&, diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 27dd810716c..b8199ebf9dd 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -199,6 +199,7 @@ protected: _memberState = state; return true; }; + options.getSlaveDelay = [this]() { return Seconds(0); }; options.syncSourceSelector = this; try { _dr.reset(new DataReplicator(options, &(getReplExecutor()))); @@ -332,12 +333,11 @@ protected: const long long cursorId = cmdElem.numberLong(); if (isGetMore && cursorId == 1LL) { // process getmore requests from the oplog fetcher - auto respBSON = - fromjson(str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'" - " , nextBatch:[{ts:Timestamp(" << ++c - << ",1), h:1, ns:'test.a', v:2, op:'u', o2:{_id:" << c - << "}, o:{$set:{a:1}}}" - "]}}"); + auto respBSON = fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'" + " , nextBatch:[{ts:Timestamp(" << ++c + << ",1), h:1, ns:'test.a', v:" << OplogEntry::kOplogVersion + << ", op:'u', o2:{_id:" << c << "}, o:{$set:{a:1}}}]}}"); net->scheduleResponse( noi, net->now(), @@ -412,14 +412,14 @@ TEST_F(InitialSyncTest, Complete) { const std::vector responses = { // get latest oplog ts fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), // oplog fetcher find fromjson( - "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), // Clone Start // listDatabases fromjson("{ok:1, databases:[{name:'a'}]}"), @@ -429,10 +429,10 @@ TEST_F(InitialSyncTest, Complete) { "{name:'a', options:{}} " "]}}"), // listIndexes:a - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}" - "]}}"), + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), // find:a fromjson( "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" @@ -441,9 +441,9 @@ TEST_F(InitialSyncTest, Complete) { // Clone Done // get latest oplog ts fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, c:1}}]}}"), // Applier starts ... }; startSync(); @@ -465,14 +465,14 @@ TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { const std::vector responses = { // get latest oplog ts fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), // oplog fetcher find fromjson( - "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'u', o2:{_id:1}, o:{$set:{a:1}}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'u', o2:{_id:1}, o:{$set:{a:1}}}]}}"), // Clone Start // listDatabases fromjson("{ok:1, databases:[{name:'a'}]}"), @@ -482,18 +482,18 @@ TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { "{name:'a', options:{}} " "]}}"), // listIndexes:a - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:1, key:{_id:1}, name:'_id_', ns:'a.a'}" - "]}}"), + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), // find:a -- empty fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"), // Clone Done // get latest oplog ts fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:1, ns:'b.c', v:2, op:'i', o:{_id:1, c:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, c:1}}]}}"), // Applier starts ... // missing doc fetch -- find:a {_id:1} fromjson( @@ -541,14 +541,14 @@ TEST_F(InitialSyncTest, FailsOnClone) { const std::vector responses = { // get latest oplog ts fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), // oplog fetcher find fromjson( - "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:2, op:'i', o:{_id:1, a:1}}" - "]}}"), + str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), // Clone Start // listDatabases fromjson("{ok:0}")}; @@ -864,7 +864,8 @@ TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) { TEST_F(SteadyStateTest, PauseDataReplicator) { auto operationToApply = BSON("op" << "a" - << "ts" << Timestamp(Seconds(123), 0)); + << "v" << OplogEntry::kOplogVersion << "ts" + << Timestamp(Seconds(123), 0)); stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; @@ -945,7 +946,8 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { TEST_F(SteadyStateTest, ApplyOneOperation) { auto operationToApply = BSON("op" << "a" - << "ts" << Timestamp(Seconds(123), 0)); + << "v" << OplogEntry::kOplogVersion << "ts" + << Timestamp(Seconds(123), 0)); stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index dcada5fd460..72cd946e601 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -105,7 +105,6 @@ using std::vector; namespace repl { std::string rsOplogName = "local.oplog.rs"; std::string masterSlaveOplogName = "local.oplog.$main"; -int OPLOG_VERSION = 2; MONGO_FP_DECLARE(disableSnapshotting); @@ -292,7 +291,7 @@ unique_ptr _logOpWriter(OperationContext* txn, if (optime.getTerm() != -1) b.append("t", optime.getTerm()); b.append("h", hashNew); - b.append("v", OPLOG_VERSION); + b.append("v", OplogEntry::kOplogVersion); b.append("op", opstr); b.append("ns", nss.ns()); if (fromMigrate) diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 5c9922b3873..75bba6782b4 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -32,9 +32,13 @@ #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/namespace_string.h" + namespace mongo { namespace repl { +const int OplogEntry::kOplogVersion = 2; + OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { for (auto elem : raw) { const auto name = elem.fieldNameStringData(); @@ -44,6 +48,8 @@ OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { opType = elem.valuestrsafe(); } else if (name == "o2") { o2 = elem; + } else if (name == "ts") { + ts = elem; } else if (name == "v") { version = elem; } else if (name == "o") { @@ -52,6 +58,26 @@ OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { } } +bool OplogEntry::isCommand() const { + return opType[0] == 'c'; +} + +bool OplogEntry::hasNamespace() const { + return !ns.empty(); +} + +int OplogEntry::getVersion() const { + return version.eoo() ? 1 : version.Int(); +} + +Seconds OplogEntry::getTimestampSecs() const { + return Seconds(ts.timestamp().getSecs()); +} + +StringData OplogEntry::getCollectionName() const { + return nsToCollectionSubstring(ns); +} + std::string OplogEntry::toString() const { return raw.toString(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 4172c9659d5..08fd1c8d95a 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -32,6 +32,7 @@ namespace mongo { namespace repl { + /** * A parsed oplog entry. * @@ -42,14 +43,21 @@ namespace repl { * All StringData members are guaranteed to be NUL terminated. */ struct OplogEntry { + // Current oplog version, should be the value of the v field in all oplog entries. + static const int kOplogVersion; + explicit OplogEntry(const BSONObj& raw); // This member is not parsed from the BSON and is instead populated by fillWriterVectors. bool isForCappedCollection = false; + bool isCommand() const; + bool hasNamespace() const; + int getVersion() const; + Seconds getTimestampSecs() const; + StringData getCollectionName() const; std::string toString() const; - BSONObj raw; // Owned. StringData ns = ""; @@ -58,6 +66,7 @@ struct OplogEntry { BSONElement version; BSONElement o; BSONElement o2; + BSONElement ts; }; std::ostream& operator<<(std::ostream& s, const OplogEntry& o); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 32f0fabb098..f158120254a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -65,6 +65,7 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" +#include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/connection_pool_stats.h" @@ -194,7 +195,9 @@ DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCo [replCoord](const OpTime& opTime) { replCoord->setMyLastAppliedOpTime(opTime); }; options.setFollowerMode = [replCoord](const MemberState& newState) { return replCoord->setFollowerMode(newState); }; + options.getSlaveDelay = [replCoord]() { return replCoord->getSlaveDelaySecs(); }; options.syncSourceSelector = replCoord; + options.replBatchLimitBytes = dur::UncommittedBytesLimit; return options; } } // namespace diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 95cd8624fea..cdee4896fab 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -882,8 +882,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op else curVersion = entry.version.Int(); - if (curVersion != OPLOG_VERSION) { - severe() << "expected oplog version " << OPLOG_VERSION << " but found version " + if (curVersion != OplogEntry::kOplogVersion) { + severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " << curVersion << " in oplog entry: " << op; fassertFailedNoTrace(18820); } -- cgit v1.2.1