diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-09 17:29:44 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-08-04 18:10:46 -0400 |
commit | 953a241f6dd1541905a1b6e259140635b80238de (patch) | |
tree | c04703faefd3cfea5e81293fc2952d7e69991a29 | |
parent | 6e3021388b44e1c9fc3f6fd02b554fe0cc5c5c3e (diff) | |
download | mongo-953a241f6dd1541905a1b6e259140635b80238de.tar.gz |
SERVER-23663 New primary syncs from chosen node to catch up with timeout
SERVER-23662 Implement "catch-up timeout" configuration variable
21 files changed, 842 insertions, 129 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml index beac15e6af8..3008a80f977 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml @@ -3,6 +3,7 @@ selector: roots: - jstests/replsets/*.js exclude_files: + - jstests/replsets/catchup.js - jstests/replsets/config_server_checks.js - jstests/replsets/disallow_adding_initialized_node1.js - jstests/replsets/disallow_adding_initialized_node2.js @@ -14,9 +15,9 @@ selector: - jstests/replsets/priority_takeover_two_nodes_equal_priority.js - jstests/replsets/read_committed*.js - jstests/replsets/read_majority_two_arbs.js + - jstests/replsets/stepup.js # The combination of new bridges and PV0 can lead to an improper spanning tree in sync2.js. - jstests/replsets/sync2.js - - jstests/replsets/stepup.js executor: js_test: diff --git a/jstests/replsets/catchup.js b/jstests/replsets/catchup.js new file mode 100644 index 00000000000..84ea333d337 --- /dev/null +++ b/jstests/replsets/catchup.js @@ -0,0 +1,167 @@ +// Test the catch-up behavior of new primaries. + +load("jstests/replsets/rslib.js"); + +(function() { + + "use strict"; + var name = "catch_up"; + var rst = new ReplSetTest({name: name, nodes: 3}); + + rst.startSet(); + var conf = rst.getReplSetConfig(); + conf.settings = { + heartbeatIntervalMillis: 500, + electionTimeoutMillis: 2000, + catchUpTimeoutMillis: 10000 + }; + rst.initiate(conf); + rst.awaitSecondaryNodes(); + + var primary = rst.getPrimary(); + var primaryColl = primary.getDB("test").coll; + + // Set verbosity for replication on all nodes. + var verbosity = { + "setParameter": 1, + "logComponentVerbosity": { + "replication": {"verbosity": 2}, + } + }; + rst.nodes.forEach(function(node) { + node.adminCommand(verbosity) + }); + + function enableFailPoint(node) { + jsTest.log("enable failpoint " + node.host); + // Disable syncing on both secondaries. + assert.commandWorked( + node.adminCommand({configureFailPoint: 'pauseRsBgSyncProducer', mode: 'alwaysOn'}), + 'Failed to configure pauseRsBgSyncProducer failpoint.'); + assert.commandWorked( + node.adminCommand({configureFailPoint: 'stopOplogFetcher', mode: 'alwaysOn'})); + } + + function disableFailPoint(node) { + jsTest.log("disable failpoint " + node.host); + assert.commandWorked( + node.adminCommand({configureFailPoint: 'stopOplogFetcher', mode: 'off'})); + assert.commandWorked( + node.adminCommand({configureFailPoint: 'pauseRsBgSyncProducer', mode: 'off'}), + 'Failed to disable pauseRsBgSyncProducer failpoint.'); + } + + function stepUp(node) { + assert.commandWorked(node.adminCommand({replSetStepUp: 1})); + return node; + } + + function doWrites(node) { + for (var i = 0; i < 3; i++) { + assert.writeOK(node.getDB("test").foo.insert({x: i})); + } + } + + function checkOpInOplog(node, op, count) { + node.getDB("admin").getMongo().setSlaveOk(); + var oplog = node.getDB("local")['oplog.rs']; + var oplogArray = oplog.find().toArray(); + assert.eq(oplog.count(op), count, "oplog: " + tojson(oplogArray)); + } + + function isEarlierTimestamp(ts1, ts2) { + if (ts1.getTime() == ts2.getTime()) { + return ts1.getInc() < ts2.getInc(); + } + return ts1.getTime() < ts2.getTime(); + } + + function countMatchedLog(node, regex) { + var res = node.adminCommand({getLog: 'global'}); + assert.commandWorked(res); + var count = 0; + res.log.forEach(function(line) { + if (regex.exec(line)) { + count += 1; + } + }); + return count; + } + + jsTest.log("Case 1: The primary is up-to-date after freshness scan."); + // Should complete transition to primary immediately. + rst.awaitReplication(30000, ReplSetTest.OpTimeType.LAST_DURABLE); + var newPrimary = stepUp(rst.getSecondary()); + rst.waitForState(newPrimary, ReplSetTest.State.PRIMARY, 1000); + // Should win an election and finish the transition very quickly. + assert.eq(newPrimary, rst.getPrimary()); + + jsTest.log("Case 2: The primary needs to catch up, succeeds in time."); + rst.awaitReplication(30000, ReplSetTest.OpTimeType.LAST_DURABLE); + // Write documents that cannot be replicated to secondaries in time. + rst.getSecondaries().forEach(enableFailPoint); + doWrites(rst.getPrimary()); + var latestOp = getLatestOp(rst.getPrimary()); + // New primary wins immediately, but needs to catch up. + newPrimary = stepUp(rst.getSecondary()); + rst.waitForState(newPrimary, ReplSetTest.State.PRIMARY, 1000); + // Check this node is not writable. + assert.eq(newPrimary.getDB("test").isMaster().ismaster, false); + // Disable fail point to allow replication. + rst.nodes.forEach(disableFailPoint); + // getPrimary() blocks until the primary finishes drain mode. + assert.eq(newPrimary, rst.getPrimary()); + // Wait for all secondaries to catch up + rst.awaitReplication(); + // Check the latest op on old primary is preserved on the new one. + checkOpInOplog(newPrimary, latestOp, 1); + + jsTest.log("Case 3: The primary needs to catch up, fails due to timeout."); + rst.awaitReplication(30000, ReplSetTest.OpTimeType.LAST_DURABLE); + // Write documents that cannot be replicated to secondaries in time. + rst.getSecondaries().forEach(enableFailPoint); + doWrites(rst.getPrimary()); + latestOp = getLatestOp(rst.getPrimary()); + + // New primary wins immediately, but needs to catch up. + newPrimary = stepUp(rst.getSecondary()); + rst.waitForState(newPrimary, ReplSetTest.State.PRIMARY, 1000); + var latestOpOnNewPrimary = getLatestOp(newPrimary); + // Wait until the new primary completes the transition to primary and writes a no-op. + assert.soon(function() { + return countMatchedLog(newPrimary, /Cannot catch up oplog after becoming primary/) > 0; + }); + disableFailPoint(newPrimary); + assert.eq(newPrimary, rst.getPrimary()); + + // Wait for the no-op "new primary" after winning an election, so that we know it has + // finished transition to primary. + assert.soon(function() { + return isEarlierTimestamp(latestOpOnNewPrimary.ts, getLatestOp(newPrimary).ts); + }); + // The extra oplog entries on the old primary are not replicated to the new one. + checkOpInOplog(newPrimary, latestOp, 0); + rst.getSecondaries().forEach(disableFailPoint); + + jsTest.log("Case 4: The primary needs to catch up, but has to change sync source to catch up."); + rst.awaitReplication(30000, ReplSetTest.OpTimeType.LAST_DURABLE); + // Write documents that cannot be replicated to secondaries in time. + rst.getSecondaries().forEach(enableFailPoint); + doWrites(rst.getPrimary()); + var oldPrimary = rst.getPrimary(); + var oldSecondaries = rst.getSecondaries(); + latestOp = getLatestOp(oldPrimary); + newPrimary = stepUp(oldSecondaries[0]); + rst.waitForState(newPrimary, ReplSetTest.State.PRIMARY, 1000); + // Disable fail point on one of the other secondaries. + // Wait until it catches up with the old primary. + disableFailPoint(oldSecondaries[1]); + awaitOpTime(oldSecondaries[1], latestOp.ts); + // Shutdown the old primary + rst.stop(oldPrimary); + // Disable the failpoint, the new primary should sync from the other secondary. + disableFailPoint(newPrimary); + assert.eq(newPrimary, rst.getPrimary()); + checkOpInOplog(newPrimary, latestOp, 1); + +})(); diff --git a/jstests/replsets/drain.js b/jstests/replsets/drain.js index 74c5868b821..357a812ce69 100644 --- a/jstests/replsets/drain.js +++ b/jstests/replsets/drain.js @@ -20,7 +20,9 @@ {"_id": 0, "host": nodes[0]}, {"_id": 1, "host": nodes[1]}, {"_id": 2, "host": nodes[2], "arbiterOnly": true} - ] + ], + // No primary catch-up so we focus on the drain mode. + "settings": {"catchUpTimeoutMillis": 0}, }); var primary = replSet.getPrimary(); diff --git a/jstests/replsets/server8070.js b/jstests/replsets/server8070.js index e91e95e99a4..f8f9430f8d2 100644 --- a/jstests/replsets/server8070.js +++ b/jstests/replsets/server8070.js @@ -99,9 +99,9 @@ var syncingTo = member3.adminCommand({replSetGetStatus: 1}).syncingTo; assert(syncingTo !== getHostName() + ":" + replSet.ports[1], "node 3 is syncing from node 2 :("); jsTest.log("Pause 3's bgsync thread"); -var rsBgSyncProduceResult3 = - member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'alwaysOn'}); -assert.eq(1, rsBgSyncProduceResult3.ok, "member 3 rsBgSyncProduce admin command failed"); +var pauseRsBgSyncProducerResult3 = + member3.runCommand({configureFailPoint: 'pauseRsBgSyncProducer', mode: 'alwaysOn'}); +assert.eq(1, pauseRsBgSyncProducerResult3.ok, "member 3 pauseRsBgSyncProducer command failed"); // count documents in member 3 assert.eq(26, @@ -123,7 +123,7 @@ assert.soon(function() { }, "Replication member 3 did not apply ops 25-75"); jsTest.log("Start 3's bgsync thread"); -member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'off'}); +member3.runCommand({configureFailPoint: 'pauseRsBgSyncProducer', mode: 'off'}); jsTest.log("Node 3 shouldn't hit rollback"); var end = (new Date()).getTime() + 10000; diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 19249c7f864..8bbda77b638 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -108,7 +108,7 @@ size_t getSize(const BSONObj& o) { } } // namespace -MONGO_FP_DECLARE(rsBgSyncProduce); +MONGO_FP_DECLARE(pauseRsBgSyncProducer); // The number and time spent reading batches off the network static TimerStats getmoreReplStats; @@ -220,7 +220,11 @@ void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) { void BackgroundSync::_runProducer() { const MemberState state = _replCoord->getMemberState(); // Stop when the state changes to primary. - if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { + // + // TODO(siyuan) Drain mode should imply we're the primary. Fix this condition and the one below + // after fixing step-down during drain mode. + if (!_replCoord->isCatchingUp() && + (_replCoord->isWaitingForApplierToDrain() || state.primary())) { if (!isStopped()) { stop(); } @@ -255,6 +259,11 @@ void BackgroundSync::_runProducer() { } void BackgroundSync::_produce(OperationContext* txn) { + + while (MONGO_FAIL_POINT(pauseRsBgSyncProducer)) { + sleepmillis(0); + } + // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced { @@ -267,17 +276,16 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } - if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary() || - _inShutdown_inlock()) { + if (!_replCoord->isCatchingUp() && + (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary())) { return; } - } - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); + if (_inShutdown_inlock()) { + return; + } } - // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; HostAndPort source; @@ -286,15 +294,25 @@ void BackgroundSync::_produce(OperationContext* txn) { lastOpTimeFetched = _lastOpTimeFetched; _syncSourceHost = HostAndPort(); } + SyncSourceResolverResponse syncSourceResp = _syncSourceResolver.findSyncSource(txn, lastOpTimeFetched); if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) { // All (accessible) sync sources were too stale. + if (_replCoord->isCatchingUp()) { + warning() << "Too stale to catch up."; + log() << "Our newest OpTime : " << lastOpTimeFetched; + log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen; + sleepsecs(1); + return; + } + error() << "too stale to catch up -- entering maintenance mode"; log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; + StorageInterface::get(txn)->setMinValid( txn, {lastOpTimeFetched, syncSourceResp.earliestOpTimeSeen}); auto status = _replCoord->setMaintenanceMode(true); @@ -329,7 +347,9 @@ void BackgroundSync::_produce(OperationContext* txn) { } lastOpTimeFetched = _lastOpTimeFetched; lastHashFetched = _lastFetchedHash; - _replCoord->signalUpstreamUpdater(); + if (!_replCoord->isCatchingUp()) { + _replCoord->signalUpstreamUpdater(); + } } // "lastFetched" not used. Already set in _enqueueDocuments. @@ -393,6 +413,12 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing || fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) { + if (_replCoord->isCatchingUp()) { + warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; + sleepsecs(1); + return; + } + // Rollback is a synchronous operation that uses the task executor and may not be // executed inside the fetcher callback. const int messagingPortTags = 0; @@ -668,7 +694,7 @@ bool BackgroundSync::shouldStopFetching() const { return true; } - if (_replCoord->getMemberState().primary()) { + if (_replCoord->getMemberState().primary() && !_replCoord->isCatchingUp()) { LOG(2) << "Interrupted by becoming primary while checking sync source."; return true; } diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp index 2623fb32b88..e3880d6cc76 100644 --- a/src/mongo/db/repl/freshness_scanner.cpp +++ b/src/mongo/db/repl/freshness_scanner.cpp @@ -80,6 +80,8 @@ void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& re OpTime lastOpTime; Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime); if (!status.isOK()) { + LOG(2) << "FreshnessScanner: failed to parse opTime in " << opTimesObj << " from " + << request.target << causedBy(status); return; } @@ -92,6 +94,8 @@ void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& re auto iter = std::upper_bound(_freshnessInfos.begin(), _freshnessInfos.end(), freshnessInfo, cmp); _freshnessInfos.insert(iter, freshnessInfo); + LOG(2) << "FreshnessScanner: processed response " << opTimesObj << " from " + << request.target; } } diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index d010fdcc585..d41c905ab9a 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -37,6 +37,7 @@ #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/time_support.h" @@ -47,6 +48,8 @@ Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2); namespace { +MONGO_FP_DECLARE(stopOplogFetcher); + /** * Calculates await data timeout based on the current replica set configuration. */ @@ -302,6 +305,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, return; } + // Stop fetching and return immediately on fail point. + if (MONGO_FAIL_POINT(stopOplogFetcher)) { + return; + } + const auto& queryResponse = result.getValue(); rpc::ReplSetMetadata metadata; diff --git a/src/mongo/db/repl/replica_set_config.cpp b/src/mongo/db/repl/replica_set_config.cpp index b3d0d8f4fc8..4f160d9488c 100644 --- a/src/mongo/db/repl/replica_set_config.cpp +++ b/src/mongo/db/repl/replica_set_config.cpp @@ -50,6 +50,8 @@ const std::string ReplicaSetConfig::kMajorityWriteConcernModeName = "$majority"; const Milliseconds ReplicaSetConfig::kDefaultHeartbeatInterval(2000); const Seconds ReplicaSetConfig::kDefaultHeartbeatTimeoutPeriod(10); const Milliseconds ReplicaSetConfig::kDefaultElectionTimeoutPeriod(10000); +// TODO(siyuan): Change the default catch up timeout to 5000 milliseconds; +const Milliseconds ReplicaSetConfig::kDefaultCatchUpTimeoutPeriod(0); const bool ReplicaSetConfig::kDefaultChainingAllowed(true); namespace { @@ -76,6 +78,7 @@ const std::string kGetLastErrorDefaultsFieldName = "getLastErrorDefaults"; const std::string kGetLastErrorModesFieldName = "getLastErrorModes"; const std::string kHeartbeatIntervalFieldName = "heartbeatIntervalMillis"; const std::string kHeartbeatTimeoutFieldName = "heartbeatTimeoutSecs"; +const std::string kCatchUpTimeoutFieldName = "catchUpTimeoutMillis"; const std::string kReplicaSetIdFieldName = "replicaSetId"; } // namespace @@ -265,6 +268,23 @@ Status ReplicaSetConfig::_parseSettingsSubdocument(const BSONObj& settings) { _heartbeatTimeoutPeriod = Seconds(heartbeatTimeoutSecs); // + // Parse catchUpTimeoutMillis + // + auto notLessThanZero = stdx::bind(std::greater_equal<long long>(), stdx::placeholders::_1, 0); + long long catchUpTimeoutMillis; + Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf( + settings, + kCatchUpTimeoutFieldName, + durationCount<Milliseconds>(kDefaultCatchUpTimeoutPeriod), + notLessThanZero, + "catch-up timeout must be greater than or equal to 0", + &catchUpTimeoutMillis); + if (!catchUpTimeoutStatus.isOK()) { + return catchUpTimeoutStatus; + } + _catchUpTimeoutPeriod = Milliseconds(catchUpTimeoutMillis); + + // // Parse chainingAllowed // Status status = bsonExtractBooleanFieldWithDefault( @@ -763,6 +783,8 @@ BSONObj ReplicaSetConfig::toBSON() const { durationCount<Seconds>(_heartbeatTimeoutPeriod)); settingsBuilder.appendIntOrLL(kElectionTimeoutFieldName, durationCount<Milliseconds>(_electionTimeoutPeriod)); + settingsBuilder.appendIntOrLL(kCatchUpTimeoutFieldName, + durationCount<Milliseconds>(_catchUpTimeoutPeriod)); BSONObjBuilder gleModes(settingsBuilder.subobjStart(kGetLastErrorModesFieldName)); diff --git a/src/mongo/db/repl/replica_set_config.h b/src/mongo/db/repl/replica_set_config.h index 7bbedf85f89..0586342df92 100644 --- a/src/mongo/db/repl/replica_set_config.h +++ b/src/mongo/db/repl/replica_set_config.h @@ -63,6 +63,7 @@ public: static const Milliseconds kDefaultElectionTimeoutPeriod; static const Milliseconds kDefaultHeartbeatInterval; static const Seconds kDefaultHeartbeatTimeoutPeriod; + static const Milliseconds kDefaultCatchUpTimeoutPeriod; static const bool kDefaultChainingAllowed; /** @@ -218,6 +219,13 @@ public: } /** + * Gets the timeout to wait for a primary to catch up its oplog. + */ + Milliseconds getCatchUpTimeoutPeriod() const { + return _catchUpTimeoutPeriod; + } + + /** * Gets the number of votes required to win an election. */ int getMajorityVoteCount() const { @@ -374,6 +382,7 @@ private: Milliseconds _electionTimeoutPeriod = kDefaultElectionTimeoutPeriod; Milliseconds _heartbeatInterval = kDefaultHeartbeatInterval; Seconds _heartbeatTimeoutPeriod = kDefaultHeartbeatTimeoutPeriod; + Milliseconds _catchUpTimeoutPeriod = kDefaultCatchUpTimeoutPeriod; bool _chainingAllowed = kDefaultChainingAllowed; bool _writeConcernMajorityJournalDefault = false; int _majorityVoteCount = 0; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index afede4ea517..62ca5fe94ec 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -399,6 +399,12 @@ public: virtual bool isWaitingForApplierToDrain() = 0; /** + * A new primary tries to have its oplog catch up after winning an election. + * Return true if the coordinator is waiting for catch-up to finish. + */ + virtual bool isCatchingUp() = 0; + + /** * Signals that a previously requested pause and drain of the applier buffer * has completed. * diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 4c4aaf14e32..51db27b9dd8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -45,6 +45,7 @@ #include "mongo/db/repl/data_replicator_external_state_impl.h" #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/freshness_checker.h" +#include "mongo/db/repl/freshness_scanner.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/last_vote.h" @@ -146,37 +147,25 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { return toBSON().toString(); } - struct ReplicationCoordinatorImpl::WaiterInfo { - /** - * Constructor takes the list of waiters and enqueues itself on the list, removing itself - * in the destructor. - */ - WaiterInfo(std::vector<WaiterInfo*>* _list, - unsigned int _opID, - const OpTime* _opTime, + + using FinishFunc = stdx::function<void()>; + + WaiterInfo(unsigned int _opID, + const OpTime _opTime, const WriteConcernOptions* _writeConcern, stdx::condition_variable* _condVar) - : list(_list), - master(true), - opID(_opID), - opTime(_opTime), - writeConcern(_writeConcern), - condVar(_condVar) { - list->push_back(this); - } + : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {} - ~WaiterInfo() { - list->erase(std::remove(list->begin(), list->end(), this), list->end()); - } + // When waiter is signaled, finishCallback will be called while holding replCoord _mutex + // since WaiterLists are protected by _mutex. + WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback) + : opTime(_opTime), finishCallback(_finishCallback) {} BSONObj toBSON() const { BSONObjBuilder bob; bob.append("opId", opID); - if (opTime) { - bob.append("opTime", opTime->toBSON()); - } - bob.append("master", master); + bob.append("opTime", opTime.toBSON()); if (writeConcern) { bob.append("writeConcern", writeConcern->toBSON()); } @@ -187,14 +176,90 @@ struct ReplicationCoordinatorImpl::WaiterInfo { return toBSON().toString(); }; - std::vector<WaiterInfo*>* list; - bool master; // Set to false to indicate that stepDown was called while waiting - const unsigned int opID; - const OpTime* opTime; - const WriteConcernOptions* writeConcern; - stdx::condition_variable* condVar; + // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex. + void notify() { + if (condVar) { + condVar->notify_all(); + } + if (finishCallback) { + finishCallback(); + } + } + + const unsigned int opID = 0; + const OpTime opTime; + const WriteConcernOptions* writeConcern = nullptr; + stdx::condition_variable* condVar = nullptr; + // The callback that will be called when this waiter is notified. + FinishFunc finishCallback = nullptr; +}; + +struct ReplicationCoordinatorImpl::WaiterInfoGuard { + /** + * Constructor takes the list of waiters and enqueues itself on the list, removing itself + * in the destructor. + * + * Usually waiters will be signaled and removed when their criteria are satisfied, but + * wait_until() with timeout may signal waiters earlier and this guard will remove the waiter + * properly. + * + * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one + * of these without holding _mutex + */ + WaiterInfoGuard(WaiterList* list, + unsigned int opID, + const OpTime opTime, + const WriteConcernOptions* writeConcern, + stdx::condition_variable* condVar) + : waiter(opID, opTime, writeConcern, condVar), _list(list) { + list->add_inlock(&waiter); + } + + ~WaiterInfoGuard() { + _list->remove_inlock(&waiter); + } + + WaiterInfo waiter; + +private: + WaiterList* _list; }; +void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { + _list.push_back(waiter); +} + +void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock( + stdx::function<bool(WaiterType)> func) { + std::vector<WaiterType>::iterator it = _list.end(); + while (true) { + it = std::find_if(_list.begin(), _list.end(), func); + if (it == _list.end()) { + break; + } + (*it)->notify(); + std::swap(*it, _list.back()); + _list.pop_back(); + } +} + +void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() { + for (auto& waiter : _list) { + waiter->notify(); + } + _list.clear(); +} + +bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) { + auto it = std::find(_list.begin(), _list.end(), waiter); + if (it != _list.end()) { + std::swap(*it, _list.back()); + _list.pop_back(); + return true; + } + return false; +} + namespace { ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) { if (settings.usingReplSets()) { @@ -594,12 +659,8 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { return; } fassert(18823, _rsConfigState != kConfigStartingUp); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* waiter = *it; - waiter->condVar->notify_all(); - } + _replicationWaiterList.signalAndRemoveAll_inlock(); + _opTimeWaiterList.signalAndRemoveAll_inlock(); } // joining the replication executor is blocking so it must be run outside of the mutex @@ -736,6 +797,12 @@ bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() { return _isWaitingForDrainToComplete; } +bool ReplicationCoordinatorImpl::isCatchingUp() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _isCatchingUp; +} + + void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // This logic is a little complicated in order to avoid acquiring the global exclusive lock // unnecessarily. This is important because the applier may call signalDrainComplete() @@ -806,7 +873,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { } stdx::unique_lock<stdx::mutex> lk(_mutex); - auto pred = [this]() { return !_isWaitingForDrainToComplete; }; + auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); @@ -1028,11 +1095,8 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op invariant(isRollbackAllowed || mySlaveInfo->lastAppliedOpTime <= opTime); _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); - for (auto& opTimeWaiter : _opTimeWaiterList) { - if (*(opTimeWaiter->opTime) <= opTime) { - opTimeWaiter->condVar->notify_all(); - } - } + _opTimeWaiterList.signalAndRemoveIf_inlock( + [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -1138,9 +1202,10 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn, // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - WaiterInfo waitInfo(&_opTimeWaiterList, txn->getOpID(), &targetOpTime, nullptr, &condVar); + WaiterInfoGuard waitInfo( + &_opTimeWaiterList, txn->getOpID(), targetOpTime, nullptr, &condVar); - LOG(3) << "Waiting for OpTime: " << waitInfo; + LOG(3) << "Waiting for OpTime: " << waitInfo.waiter; if (txn->hasDeadline()) { condVar.wait_until(lock, txn->getDeadline().toSystemTimePoint()); } else { @@ -1334,22 +1399,9 @@ void ReplicationCoordinatorImpl::interrupt(unsigned opId) { // Wake ops waiting for a new committed snapshot. _currentCommittedSnapshotCond.notify_all(); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* info = *it; - if (info->opID == opId) { - info->condVar->notify_all(); - return; - } - } - - for (auto& opTimeWaiter : _opTimeWaiterList) { - if (opTimeWaiter->opID == opId) { - opTimeWaiter->condVar->notify_all(); - return; - } - } + auto hasSameOpID = [opId](WaiterInfo* waiter) { return waiter->opID == opId; }; + _replicationWaiterList.signalAndRemoveIf_inlock(hasSameOpID); + _opTimeWaiterList.signalAndRemoveIf_inlock(hasSameOpID); } { @@ -1364,16 +1416,8 @@ void ReplicationCoordinatorImpl::interruptAll() { // Wake ops waiting for a new committed snapshot. _currentCommittedSnapshotCond.notify_all(); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* info = *it; - info->condVar->notify_all(); - } - - for (auto& opTimeWaiter : _opTimeWaiterList) { - opTimeWaiter->condVar->notify_all(); - } + _replicationWaiterList.signalAndRemoveAll_inlock(); + _opTimeWaiterList.signalAndRemoveAll_inlock(); } { @@ -1543,7 +1587,8 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; - WaiterInfo waitInfo(&_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar); + WaiterInfoGuard waitInfo( + &_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { const Milliseconds elapsed{timer->millis()}; @@ -1552,7 +1597,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl return StatusAndDuration(interruptedStatus, elapsed); } - if (!waitInfo.master) { + if (replMode == modeReplSet && !_getMemberState_inlock().primary()) { return StatusAndDuration(Status(ErrorCodes::NotMaster, "Not master anymore while waiting for replication" " - this most likely means that a step down" @@ -2001,7 +2046,7 @@ void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* respon response->setLastMajorityWrite(majorityOpTime, majorityOpTime.getTimestamp().getSecs()); } - if (isWaitingForApplierToDrain()) { + if (isWaitingForApplierToDrain() || isCatchingUp()) { // Report that we are secondary to ismaster callers until drain completes. response->setIsMaster(false); response->setIsSecondary(true); @@ -2475,13 +2520,9 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { PostMemberStateUpdateAction result; if (_memberState.primary() || newState.removed() || newState.rollback()) { // Wake up any threads blocked in awaitReplication, close connections, etc. - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* info = *it; - info->master = false; - info->condVar->notify_all(); - } + _replicationWaiterList.signalAndRemoveAll_inlock(); + // Wake up the optime waiter that is waiting for primary catch-up to finish. + _opTimeWaiterList.signalAndRemoveAll_inlock(); _canAcceptNonLocalWrites = false; result = kActionCloseAllConnections; } else { @@ -2580,15 +2621,20 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( _electionId = OID::gen(); } _topCoord->processWinElection(_electionId, getNextGlobalTimestamp()); - _isWaitingForDrainToComplete = true; + _isCatchingUp = true; const PostMemberStateUpdateAction nextAction = _updateMemberStateFromTopologyCoordinator_inlock(); invariant(nextAction != kActionWinElection); lk.unlock(); - _externalState->signalApplierToCancelFetcher(); _performPostMemberStateUpdateAction(nextAction); // Notify all secondaries of the election win. _scheduleElectionWinNotification(); + lk.lock(); + if (isV1ElectionProtocol()) { + _scanOpTimeForCatchUp_inlock(); + } else { + _finishCatchUpOplog_inlock(true); + } break; } case kActionStartSingleNodeElection: @@ -2602,6 +2648,99 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } } +void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { + auto scanner = std::make_shared<FreshnessScanner>(); + auto scanStartTime = _replExecutor.now(); + auto evhStatus = + scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); + if (evhStatus == ErrorCodes::ShutdownInProgress) { + _finishCatchUpOplog_inlock(true); + return; + } + fassertStatusOK(40254, evhStatus.getStatus()); + long long term = _cachedTerm; + _replExecutor.onEvent( + evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { + LockGuard lk(_mutex); + if (cbData.status == ErrorCodes::CallbackCanceled) { + _finishCatchUpOplog_inlock(true); + return; + } + auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); + auto catchUpTimeout = totalTimeout - (_replExecutor.now() - scanStartTime); + _catchUpOplogToLatest_inlock(*scanner, catchUpTimeout, term); + }); +} + +void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessScanner& scanner, + Milliseconds timeout, + long long originalTerm) { + // On stepping down, the node doesn't update its term immediately due to SERVER-21425. + // Term is also checked in case the catchup timeout is so long that the node becomes primary + // again. + if (!_memberState.primary() || originalTerm != _cachedTerm) { + log() << "Stopped transition to primary of term " << originalTerm + << " because I've already stepped down."; + _finishCatchUpOplog_inlock(false); + return; + } + + auto result = scanner.getResult(); + + // Cannot access any nodes within timeout. + if (result.size() == 0) { + log() << "Could not access any nodes within timeout when checking for " + << "additional ops to apply before finishing transition to primary. " + << "Will move forward with becoming primary anyway."; + _finishCatchUpOplog_inlock(true); + return; + } + + // I'm most up-to-date as far as I know. + auto freshnessInfo = result.front(); + if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { + log() << "My optime is most up-to-date, skipping catch-up " + << "and completing transition to primary."; + _finishCatchUpOplog_inlock(true); + return; + } + + // Wait for the replication level to reach the latest opTime within timeout. + auto latestOpTime = freshnessInfo.opTime; + auto finishCB = [this, latestOpTime]() { + if (latestOpTime > _getMyLastAppliedOpTime_inlock()) { + log() << "Cannot catch up oplog after becoming primary."; + } else { + log() << "Finished catch-up oplog after becoming primary."; + } + + _finishCatchUpOplog_inlock(true); + }; + auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB); + + _opTimeWaiterList.add_inlock(waiterInfo.get()); + auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { + LockGuard lk(_mutex); + if (_opTimeWaiterList.remove_inlock(waiterInfo.get())) { + finishCB(); + } + }; + // Schedule the timeout callback. It may signal after we have already caught up. + _replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB); +} + +void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) { + _isCatchingUp = false; + // If the node steps down during the catch-up, we don't go into drain mode. + if (startToDrain) { + _isWaitingForDrainToComplete = true; + // Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to + // cancel fetcher. + _replExecutor.scheduleWork( + _wrapAsCallbackFn([this]() { _externalState->signalApplierToCancelFetcher(); })); + } +} + Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) { stdx::lock_guard<stdx::mutex> lk(_mutex); resultObj->append("rbid", _rbid); @@ -2687,15 +2826,10 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& n } void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); - ++it) { - WaiterInfo* info = *it; - if (_doneWaitingForReplication_inlock( - *info->opTime, SnapshotName::min(), *info->writeConcern)) { - info->condVar->notify_all(); - } - } + _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) { + return _doneWaitingForReplication_inlock( + waiter->opTime, SnapshotName::min(), *waiter->writeConcern); + }); } Status ReplicationCoordinatorImpl::processReplSetUpdatePosition( diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 480a5287dc1..798762a1494 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -73,6 +73,7 @@ namespace repl { class ElectCmdRunner; class FreshnessChecker; +class FreshnessScanner; class HandshakeArgs; class HeartbeatResponseAction; class LastVote; @@ -198,6 +199,8 @@ public: virtual bool isWaitingForApplierToDrain() override; + virtual bool isCatchingUp() override; + virtual void signalDrainComplete(OperationContext* txn) override; virtual Status waitForDrainFinish(Milliseconds timeout) override; @@ -505,6 +508,25 @@ private: // Struct that holds information about clients waiting for replication. struct WaiterInfo; + struct WaiterInfoGuard; + + class WaiterList { + public: + using WaiterType = WaiterInfo*; + + // Adds waiter into the list. Usually, the waiter will be signaled only once and then + // removed. + void add_inlock(WaiterType waiter); + // Returns whether waiter is found and removed. + bool remove_inlock(WaiterType waiter); + // Signals and removes all waiters that satisfy the condition. + void signalAndRemoveIf_inlock(stdx::function<bool(WaiterType)> fun); + // Signals and removes all waiters from the list. + void signalAndRemoveAll_inlock(); + + private: + std::vector<WaiterType> _list; + }; // Struct that holds information about nodes in this replication group, mainly used for // tracking replication progress for write concern satisfaction. @@ -1113,6 +1135,27 @@ private: */ executor::TaskExecutor::CallbackFn _wrapAsCallbackFn(const stdx::function<void()>& work); + /** + * Scan all nodes to find out the the latest optime in the replset, thus we know when there's no + * more to catch up before the timeout. It also schedules the actual catch-up once we get the + * response from the freshness scan. + */ + void _scanOpTimeForCatchUp_inlock(); + /** + * Wait for data replication until we reach the latest optime, or the timeout expires. + * "originalTerm" is the term when catch-up work is scheduled and used to detect + * the step-down (and potential following step-up) after catch-up gets scheduled. + */ + void _catchUpOplogToLatest_inlock(const FreshnessScanner& scanner, + Milliseconds timeout, + long long originalTerm); + /** + * Finish catch-up mode and start drain mode. + * If "startToDrain" is true, the node enters drain mode. Otherwise, it goes back to secondary + * mode. + */ + void _finishCatchUpOplog_inlock(bool startToDrain); + // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -1176,11 +1219,11 @@ private: int _rbid; // (M) // list of information about clients waiting on replication. Does *not* own the WaiterInfos. - std::vector<WaiterInfo*> _replicationWaiterList; // (M) + WaiterList _replicationWaiterList; // (M) // list of information about clients waiting for a particular opTime. // Does *not* own the WaiterInfos. - std::vector<WaiterInfo*> _opTimeWaiterList; // (M) + WaiterList _opTimeWaiterList; // (M) // Set to true when we are in the process of shutting down replication. bool _inShutdown; // (M) @@ -1210,6 +1253,9 @@ private: // True if we are waiting for the applier to finish draining. bool _isWaitingForDrainToComplete; // (M) + // True if we are waiting for oplog catch-up to finish. + bool _isCatchingUp = false; // (M) + // Used to signal threads waiting for changes to _rsConfigState. stdx::condition_variable _rsConfigStateChange; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index b2686b0cc47..8198dc43211 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -105,6 +105,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); + simulateCatchUpTimeout(); ASSERT(getReplCoord()->isWaitingForApplierToDrain()); const auto txnPtr = makeOperationContext(); @@ -167,6 +168,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); + // Wait for catchup check to finish. + simulateCatchUpTimeout(); ASSERT(getReplCoord()->isWaitingForApplierToDrain()); const auto txnPtr = makeOperationContext(); @@ -904,6 +907,244 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole()); } +class PrimaryCatchUpTest : public ReplCoordTest { +protected: + using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; + using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>; + + void simulateSuccessfulV1Voting() { + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + NetworkInterfaceMock* net = getNet(); + + auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest(); + ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen); + log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)"; + + ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); + ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); + bool hasReadyRequests = true; + // Process requests until we're primary and consume the heartbeats for the notification + // of election win. Exit immediately on catch up. + while (!replCoord->isCatchingUp() && + (!replCoord->getMemberState().primary() || hasReadyRequests)) { + log() << "Waiting on network in state " << replCoord->getMemberState(); + getNet()->enterNetwork(); + if (net->now() < electionTimeoutWhen) { + net->runUntil(electionTimeoutWhen); + } + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + ReplSetHeartbeatArgsV1 hbArgs; + Status status = hbArgs.initialize(request.cmdObj); + if (hbArgs.initialize(request.cmdObj).isOK()) { + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(rsConfig.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true))); + } else if (request.cmdObj.firstElement().fieldNameStringData() == + "replSetRequestVotes") { + net->scheduleResponse(noi, + net->now(), + makeResponseStatus(BSON("ok" << 1 << "reason" + << "" + << "term" + << request.cmdObj["term"].Long() + << "voteGranted" + << true))); + } else { + error() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + net->blackHole(noi); + } + net->runReadyNetworkOperations(); + hasReadyRequests = net->hasReadyRequests(); + getNet()->exitNetwork(); + } + } + + ReplicaSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345")) + << "protocolVersion" + << 1 + << "settings" + << BSON("catchUpTimeoutMillis" << 5000)); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplicaSetConfig config = assertMakeRSConfig(configObj); + + getReplCoord()->setMyLastAppliedOpTime(opTime); + getReplCoord()->setMyLastDurableOpTime(opTime); + ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + simulateSuccessfulV1Voting(); + IsMasterResponse imResponse; + getReplCoord()->fillIsMasterForReplSet(&imResponse); + ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); + + return config; + } + + ResponseStatus makeFreshnessScanResponse(OpTime opTime) { + // OpTime part of replSetGetStatus. + return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON()))); + } + + void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) { + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + while (net->hasReadyRequests()) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") { + log() << request.target.toString() << " processing " << request.cmdObj; + onFreshnessScanRequest(noi); + } else { + log() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + net->blackHole(noi); + } + net->runReadyNetworkOperations(); + } + net->exitNetwork(); + } +}; + +TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) { + startCapturingLogMessages(); + OpTime time1(Timestamp(100, 1), 0); + ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + processFreshnessScanRequests([this](const NetworkOpIter noi) { + getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); + }); + ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up")); + auto txn = makeOperationContext(); + getReplCoord()->signalDrainComplete(txn.get()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + +TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + processFreshnessScanRequests([this](const NetworkOpIter noi) { + auto request = noi->getRequest(); + log() << "Black holing request to " << request.target << ": " << request.cmdObj; + getNet()->blackHole(noi); + }); + + auto net = getNet(); + net->enterNetwork(); + net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); + net->exitNetwork(); + ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout")); + auto txn = makeOperationContext(); + getReplCoord()->signalDrainComplete(txn.get()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + +TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); + ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + }); + + NetworkInterfaceMock* net = getNet(); + ASSERT(getReplCoord()->isCatchingUp()); + // Simulate the work done by bgsync and applier threads. + // setMyLastAppliedOpTime() will signal the optime waiter. + getReplCoord()->setMyLastAppliedOpTime(time2); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); + auto txn = makeOperationContext(); + getReplCoord()->signalDrainComplete(txn.get()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + +TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); + ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + // The new primary learns of the latest OpTime. + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + }); + + NetworkInterfaceMock* net = getNet(); + ASSERT(getReplCoord()->isCatchingUp()); + net->enterNetwork(); + net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); + net->exitNetwork(); + ASSERT(getReplCoord()->isWaitingForApplierToDrain()); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); + auto txn = makeOperationContext(); + getReplCoord()->signalDrainComplete(txn.get()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + +TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); + ReplicaSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto request = noi->getRequest(); + log() << "Black holing request to " << request.target << ": " << request.cmdObj; + getNet()->blackHole(noi); + }); + ASSERT(getReplCoord()->isCatchingUp()); + + TopologyCoordinator::UpdateTermResult updateTermResult; + auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); + ASSERT_TRUE(evh.isValid()); + getReplExec()->waitForEvent(evh); + ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + auto net = getNet(); + net->enterNetwork(); + net->runUntil(net->now() + config.getCatchUpTimeoutPeriod()); + net->exitNetwork(); + ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain()); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); + ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index b7e78539c3a..aa61d91d10a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -97,11 +97,17 @@ struct OpTimeWithTermZero { }; void runSingleNodeElection(ServiceContext::UniqueOperationContext txn, - ReplicationCoordinatorImpl* replCoord) { + ReplicationCoordinatorImpl* replCoord, + executor::NetworkInterfaceMock* net) { replCoord->setMyLastAppliedOpTime(OpTime(Timestamp(1, 0), 0)); replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); replCoord->waitForElectionFinish_forTest(); + // Wait for primary catch-up + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + ASSERT(replCoord->isWaitingForApplierToDrain()); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); @@ -1626,7 +1632,7 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); const auto txn = makeOperationContext(); ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000))); @@ -2525,7 +2531,7 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); time_t lastWriteDate = 101; OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1); @@ -3521,7 +3527,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) { << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); @@ -3546,7 +3552,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); const auto txn = makeOperationContext(); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); @@ -3574,7 +3580,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 1)); @@ -3598,7 +3604,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); OpTime time(Timestamp(100, 0), 1); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); @@ -3621,7 +3627,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1)); OpTime committedOpTime(Timestamp(200, 0), 1); @@ -3650,7 +3656,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1)); @@ -4255,7 +4261,7 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -4288,7 +4294,7 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -4319,7 +4325,7 @@ TEST_F(ReplCoordTest, << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -4352,7 +4358,7 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -4381,7 +4387,7 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) { << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord()); + runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -4755,6 +4761,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { // Single node cluster - this node should start election on setFollowerMode() completion. replCoord->waitForElectionFinish_forTest(); + simulateCatchUpTimeout(); // Successful dry run election increases term. ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index b962e9b11bc..19640ba1c55 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -209,6 +209,10 @@ bool ReplicationCoordinatorMock::isWaitingForApplierToDrain() { return false; } +bool ReplicationCoordinatorMock::isCatchingUp() { + return false; +} + void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*) {} Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 4dd04f8fb3f..706d4832d38 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -130,6 +130,8 @@ public: virtual bool isWaitingForApplierToDrain(); + virtual bool isCatchingUp(); + virtual void signalDrainComplete(OperationContext*); virtual Status waitForDrainFinish(Milliseconds timeout) override; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index ec0497903a2..c630ddca26d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -315,6 +315,10 @@ void ReplCoordTest::simulateSuccessfulV1Election() { << request.cmdObj["term"].Long() << "voteGranted" << true))); + } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") { + // OpTime part of replSetGetStatus for use by FreshnessScanner during catch-up period. + BSONObj response = BSON("optimes" << BSON("appliedOpTime" << OpTime().toBSON())); + net->scheduleResponse(noi, net->now(), makeResponseStatus(response)); } else { error() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; @@ -459,5 +463,33 @@ void ReplCoordTest::disableSnapshots() { _externalState->setAreSnapshotsEnabled(false); } +void ReplCoordTest::simulateCatchUpTimeout() { + NetworkInterfaceMock* net = getNet(); + auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod(); + bool hasRequest = false; + net->enterNetwork(); + if (net->now() < catchUpTimeoutWhen) { + net->runUntil(catchUpTimeoutWhen); + } + hasRequest = net->hasReadyRequests(); + net->exitNetwork(); + + while (hasRequest) { + net->enterNetwork(); + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + // Black hole heartbeat requests caused by time advance. + log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj; + net->blackHole(noi); + if (net->now() < catchUpTimeoutWhen) { + net->runUntil(catchUpTimeoutWhen); + } else { + net->runReadyNetworkOperations(); + } + hasRequest = net->hasReadyRequests(); + net->exitNetwork(); + } +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 6e983aa346d..5ea7a0a6953 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -258,6 +258,11 @@ protected: */ void disableSnapshots(); + /** + * Timeout all freshness scan request for primary catch-up. + */ + void simulateCatchUpTimeout(); + private: std::unique_ptr<ReplicationCoordinatorImpl> _repl; // Owned by ReplicationCoordinatorImpl diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index b70fcfdbd46..46a34757ef8 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -135,7 +135,8 @@ void RSDataSync::_run() { } try { - if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain()) { + if (memberState.primary() && !_replCoord->isWaitingForApplierToDrain() && + !_replCoord->isCatchingUp()) { sleepsecs(1); continue; } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index e6b5e7557ee..a642140b72c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1290,7 +1290,8 @@ StatusWith<OpTime> multiApply(OperationContext* txn, Lock::ParallelBatchWriterMode pbwm(txn->lockState()); auto replCoord = ReplicationCoordinator::get(txn); - if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) { + if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain() && + !replCoord->isCatchingUp()) { severe() << "attempting to replicate ops while primary"; return {ErrorCodes::CannotApplyOplogWhilePrimary, "attempting to replicate ops while primary"}; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 708ef8de639..0d6dd7ee627 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -150,11 +150,6 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const Timestamp& lastTimestampApplied) { - // If we are primary, then we aren't syncing from anyone (else). - if (_iAmPrimary()) { - return HostAndPort(); - } - // If we are not a member of the current replica set configuration, no sync source is valid. if (_selfIndex == -1) { LOG(2) << "Cannot sync from any members because we are not in the replica set config"; |