diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-04-20 15:28:38 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-06-16 13:59:20 -0400 |
commit | 7f509b09e3b17a4e6627fdbd9a46d12611745941 (patch) | |
tree | 5afc1422129f9bedc9ebe73cef14b2d360cc9a7a | |
parent | d2749a7c3c325f340df29d9aef3ef62e4125ee08 (diff) | |
download | mongo-7f509b09e3b17a4e6627fdbd9a46d12611745941.tar.gz |
SERVER-26848 Exit catchup mode when not syncing more data.
(cherry picked from commit 4680351e3fe6f8f47c04440f1c5d1232a4ab7b2d)
23 files changed, 779 insertions, 296 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 079c7497427..0115672dcda 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -380,6 +380,7 @@ ], repairCursor: {command: {repairCursor: "view"}, expectFailure: true}, repairDatabase: {command: {repairDatabase: 1}}, + replSetAbortPrimaryCatchUp: {skip: isUnrelated}, replSetElect: {skip: isUnrelated}, replSetFreeze: {skip: isUnrelated}, replSetFresh: {skip: isUnrelated}, diff --git a/jstests/replsets/catchup.js b/jstests/replsets/catchup.js index 7c1cbf41c85..797db8305cd 100644 --- a/jstests/replsets/catchup.js +++ b/jstests/replsets/catchup.js @@ -12,6 +12,7 @@ rst.startSet(); var conf = rst.getReplSetConfig(); + conf.members[2].priority = 0; conf.settings = { heartbeatIntervalMillis: 500, electionTimeoutMillis: 10000, @@ -44,12 +45,6 @@ return node; } - function doWrites(node) { - for (var i = 0; i < 3; i++) { - assert.writeOK(node.getDB("test").foo.insert({x: i})); - } - } - function checkOpInOplog(node, op, count) { node.getDB("admin").getMongo().setSlaveOk(); var oplog = node.getDB("local")['oplog.rs']; @@ -57,16 +52,51 @@ assert.eq(oplog.count(op), count, "op: " + tojson(op) + ", oplog: " + tojson(oplogArray)); } - function isEarlierTimestamp(ts1, ts2) { - if (ts1.getTime() == ts2.getTime()) { - return ts1.getInc() < ts2.getInc(); + // Stop replication on secondaries, do writes and step up one of the secondaries. + // + // The old primary has extra writes that are not replicated to the other nodes yet, + // but the new primary steps up, getting the vote from the the third node "voter". + function stopRelicationAndEnforceNewPrimaryToCatchUp() { + // Write documents that cannot be replicated to secondaries in time. + var oldSecondaries = rst.getSecondaries(); + var oldPrimary = rst.getPrimary(); + stopServerReplication(oldSecondaries); + for (var i = 0; i < 3; i++) { + assert.writeOK(oldPrimary.getDB("test").foo.insert({x: i})); } - return ts1.getTime() < ts2.getTime(); + var latestOpOnOldPrimary = getLatestOp(oldPrimary); + // New primary wins immediately, but needs to catch up. + var newPrimary = stepUpNode(oldSecondaries[0]); + rst.awaitNodesAgreeOnPrimary(); + var latestOpOnNewPrimary = getLatestOp(newPrimary); + // Check this node is not writable. + assert.eq(newPrimary.getDB("test").isMaster().ismaster, false); + + return { + oldSecondaries: oldSecondaries, + oldPrimary: oldPrimary, + newPrimary: newPrimary, + voter: oldSecondaries[1], + latestOpOnOldPrimary: latestOpOnOldPrimary, + latestOpOnNewPrimary: latestOpOnNewPrimary + }; + } + + function reconfigCatchUpTimeoutMillis(timeout) { + // Reconnect all nodes to make sure reconfig succeeds. + rst.nodes.forEach(reconnect); + // Reconfigure replicaset to decrease catchup timeout + conf = rst.getReplSetConfigFromNode(); + conf.version++; + conf.settings.catchUpTimeoutMillis = timeout; + reconfig(rst, conf); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); + rst.awaitNodesAgreeOnPrimary(); } rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); - jsTest.log("Case 1: The primary is up-to-date after freshness scan."); + jsTest.log("Case 1: The primary is up-to-date after refreshing heartbeats."); // Should complete transition to primary immediately. var newPrimary = stepUpNode(rst.getSecondary()); // Should win an election and finish the transition very quickly. @@ -74,77 +104,95 @@ rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); jsTest.log("Case 2: The primary needs to catch up, succeeds in time."); - // Write documents that cannot be replicated to secondaries in time. - var originalSecondaries = rst.getSecondaries(); - stopServerReplication(originalSecondaries); - doWrites(rst.getPrimary()); - var latestOp = getLatestOp(rst.getPrimary()); - // New primary wins immediately, but needs to catch up. - newPrimary = stepUpNode(rst.getSecondary()); - // Check this node is not writable. - assert.eq(newPrimary.getDB("test").isMaster().ismaster, false); + var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); + // Disable fail point to allow replication. - restartServerReplication(originalSecondaries); + restartServerReplication(stepUpResults.oldSecondaries); // getPrimary() blocks until the primary finishes drain mode. - assert.eq(newPrimary, rst.getPrimary()); + assert.eq(stepUpResults.newPrimary, rst.getPrimary()); // Wait for all secondaries to catch up - rst.awaitReplication(); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); // Check the latest op on old primary is preserved on the new one. - checkOpInOplog(newPrimary, latestOp, 1); + checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1); rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); jsTest.log("Case 3: The primary needs to catch up, but has to change sync source to catch up."); - // Write documents that cannot be replicated to secondaries in time. - stopServerReplication(rst.getSecondaries()); - doWrites(rst.getPrimary()); - var oldPrimary = rst.getPrimary(); - originalSecondaries = rst.getSecondaries(); - latestOp = getLatestOp(oldPrimary); - newPrimary = stepUpNode(originalSecondaries[0]); - // Disable fail point on one of the other secondaries. - // Wait until it catches up with the old primary. - restartServerReplication(originalSecondaries[1]); - assert.commandWorked(originalSecondaries[1].adminCommand({replSetSyncFrom: oldPrimary.host})); - awaitOpTime(originalSecondaries[1], latestOp.ts); + stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); + + // Disable fail point on the voter. Wait until it catches up with the old primary. + restartServerReplication(stepUpResults.voter); + assert.commandWorked( + stepUpResults.voter.adminCommand({replSetSyncFrom: stepUpResults.oldPrimary.host})); + awaitOpTime(stepUpResults.voter, stepUpResults.latestOpOnOldPrimary.ts); // Disconnect the new primary and the old one. - oldPrimary.disconnect(newPrimary); + stepUpResults.oldPrimary.disconnect(stepUpResults.newPrimary); // Disable the failpoint, the new primary should sync from the other secondary. - restartServerReplication(newPrimary); - assert.eq(newPrimary, rst.getPrimary()); - checkOpInOplog(newPrimary, latestOp, 1); + restartServerReplication(stepUpResults.newPrimary); + assert.eq(stepUpResults.newPrimary, rst.getPrimary()); + checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1); // Restore the broken connection - oldPrimary.reconnect(newPrimary); + stepUpResults.oldPrimary.reconnect(stepUpResults.newPrimary); rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); jsTest.log("Case 4: The primary needs to catch up, fails due to timeout."); - // Reconfigure replicaset to decrease catchup timeout - conf = rst.getReplSetConfigFromNode(); - conf.version++; - conf.settings.catchUpTimeoutMillis = 10 * 1000; - reconfig(rst, conf); + reconfigCatchUpTimeoutMillis(10 * 1000); + + stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); + // Wait until the new primary completes the transition to primary and writes a no-op. + checkLog.contains(stepUpResults.newPrimary, "Catchup timed out after becoming primary"); + restartServerReplication(stepUpResults.newPrimary); + assert.eq(stepUpResults.newPrimary, rst.getPrimary()); + + // Wait for the no-op "new primary" after winning an election, so that we know it has + // finished transition to primary. + assert.soon(function() { + return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary, + getLatestOp(stepUpResults.newPrimary)) < 0; + }); + // The extra oplog entries on the old primary are not replicated to the new one. + checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0); + restartServerReplication(stepUpResults.voter); rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); - rst.awaitNodesAgreeOnPrimary(); - // Write documents that cannot be replicated to secondaries in time. - originalSecondaries = rst.getSecondaries(); - stopServerReplication(originalSecondaries); - doWrites(rst.getPrimary()); - latestOp = getLatestOp(rst.getPrimary()); + jsTest.log("Case 5: The primary needs to catch up with no timeout, then gets aborted."); + reconfigCatchUpTimeoutMillis(-1); + stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); - // New primary wins immediately, but needs to catch up. - newPrimary = stepUpNode(originalSecondaries[0]); - var latestOpOnNewPrimary = getLatestOp(newPrimary); - // Wait until the new primary completes the transition to primary and writes a no-op. - checkLog.contains(newPrimary, "Cannot catch up oplog after becoming primary"); - restartServerReplication(newPrimary); - assert.eq(newPrimary, rst.getPrimary()); + // Abort catchup. + assert.commandWorked(stepUpResults.newPrimary.adminCommand({replSetAbortPrimaryCatchUp: 1})); // Wait for the no-op "new primary" after winning an election, so that we know it has // finished transition to primary. assert.soon(function() { - return isEarlierTimestamp(latestOpOnNewPrimary.ts, getLatestOp(newPrimary).ts); + return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary, + getLatestOp(stepUpResults.newPrimary)) < 0; }); // The extra oplog entries on the old primary are not replicated to the new one. - checkOpInOplog(newPrimary, latestOp, 0); - restartServerReplication(originalSecondaries[1]); + checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0); + restartServerReplication(stepUpResults.oldSecondaries); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); + checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0); + + // TODO: Uncomment case 6 when SERVER-28751 gets fixed. + // + // jsTest.log("Case 6: The primary needs to catch up with no timeout, but steps down."); + // var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); + + // // Step-down command should abort catchup. + // try { + // printjson(stepUpResults.newPrimary.adminCommand({replSetStepDown: 60})); + // } catch (e) { + // print(e); + // } + // // Rename the primary. + // var steppedDownPrimary = stepUpResults.newPrimary; + // var newPrimary = rst.getPrimary(); + // assert.neq(newPrimary, steppedDownPrimary); + + // // Enable data replication on the stepped down primary and make sure it syncs old writes. + // rst.nodes.forEach(reconnect); + // restartServerReplication(stepUpResults.oldSecondaries); + // rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); + // checkOpInOplog(steppedDownPrimary, stepUpResults.latestOpOnOldPrimary, 1); + })(); diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index 1471824bd8f..5911723d717 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -162,6 +162,7 @@ var getLastOpTime; if (!isNetworkError(e)) { throw e; } + print("Calling replSetReconfig failed. " + tojson(e)); } var master = rs.getPrimary().getDB("admin"); diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 9223b73e4e9..2c04c40bf55 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -308,13 +308,12 @@ void BackgroundSync::_produce(OperationContext* opCtx) { if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) { // All (accessible) sync sources were too stale. - // TODO: End catchup mode early if we are too stale. if (_replCoord->getMemberState().primary()) { warning() << "Too stale to catch up."; log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen << " from " << syncSourceResp.getSyncSource(); - sleepsecs(1); + _replCoord->abortCatchupIfNeeded(); return; } @@ -472,9 +471,8 @@ void BackgroundSync::_produce(OperationContext* opCtx) { return; } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) { if (_replCoord->getMemberState().primary()) { - // TODO: Abort catchup mode early if rollback detected. - warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; - sleepsecs(1); + warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode."; + _replCoord->abortCatchupIfNeeded(); return; } diff --git a/src/mongo/db/repl/member_heartbeat_data.cpp b/src/mongo/db/repl/member_heartbeat_data.cpp index c267a6ba8ed..1b9b9ea3f13 100644 --- a/src/mongo/db/repl/member_heartbeat_data.cpp +++ b/src/mongo/db/repl/member_heartbeat_data.cpp @@ -54,6 +54,8 @@ void MemberHeartbeatData::setUpValues(Date_t now, } _authIssue = false; _lastHeartbeat = now; + _updatedSinceRestart = true; + if (!hbResponse.hasState()) { hbResponse.setState(MemberState::RS_UNKNOWN); } @@ -77,6 +79,7 @@ void MemberHeartbeatData::setDownValues(Date_t now, const std::string& heartbeat _upSince = Date_t(); _lastHeartbeat = now; _authIssue = false; + _updatedSinceRestart = true; _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_DOWN); @@ -91,6 +94,7 @@ void MemberHeartbeatData::setAuthIssue(Date_t now) { _upSince = Date_t(); _lastHeartbeat = now; _authIssue = true; + _updatedSinceRestart = true; _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_UNKNOWN); diff --git a/src/mongo/db/repl/member_heartbeat_data.h b/src/mongo/db/repl/member_heartbeat_data.h index 0f5dacd9081..f67a0a87757 100644 --- a/src/mongo/db/repl/member_heartbeat_data.h +++ b/src/mongo/db/repl/member_heartbeat_data.h @@ -123,6 +123,17 @@ public: */ void setAuthIssue(Date_t now); + /** + * Reset the boolean to record the last restart. + */ + void restart() { + _updatedSinceRestart = false; + } + + bool isUpdatedSinceRestart() const { + return _updatedSinceRestart; + } + private: // -1 = not checked yet, 0 = member is down/unreachable, 1 = member is up int _health; @@ -139,6 +150,9 @@ private: // The last heartbeat response we received. ReplSetHeartbeatResponse _lastResponse; + + // Have we received heartbeats since the last restart? + bool _updatedSinceRestart = false; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index c2793e42764..07add4b7842 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -877,7 +877,7 @@ public: status = getGlobalReplicationCoordinator()->stepUpIfEligible(); if (!status.isOK()) { - log() << "replSetStepUp request failed " << causedBy(status); + log() << "replSetStepUp request failed" << causedBy(status); } return appendCommandStatus(result, status); @@ -889,5 +889,39 @@ private: } } cmdReplSetStepUp; +class CmdReplSetAbortPrimaryCatchUp : public ReplSetCommand { +public: + virtual void help(stringstream& help) const { + help << "{ CmdReplSetAbortPrimaryCatchUp : 1 }\n"; + help << "Abort primary catch-up mode; immediately finish the transition to primary " + "without fetching any further unreplicated writes from any other online nodes"; + } + + CmdReplSetAbortPrimaryCatchUp() : ReplSetCommand("replSetAbortPrimaryCatchUp") {} + + virtual bool run(OperationContext* opCtx, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) override { + Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + if (!status.isOK()) + return appendCommandStatus(result, status); + log() << "Received replSetAbortPrimaryCatchUp request"; + + status = getGlobalReplicationCoordinator()->abortCatchupIfNeeded(); + if (!status.isOK()) { + log() << "replSetAbortPrimaryCatchUp request failed" << causedBy(status); + } + return appendCommandStatus(result, status); + } + +private: + ActionSet getAuthActionSet() const override { + return ActionSet{ActionType::replSetStateChange}; + } +} cmdReplSetAbortPrimaryCatchUp; + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index 929f2c5d2f0..1ad2d245ac6 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -44,6 +44,7 @@ namespace repl { const size_t ReplSetConfig::kMaxMembers; const size_t ReplSetConfig::kMaxVotingMembers; +const Milliseconds ReplSetConfig::kInfiniteCatchUpTimeout(-1); const std::string ReplSetConfig::kConfigServerFieldName = "configsvr"; const std::string ReplSetConfig::kVersionFieldName = "version"; @@ -51,7 +52,7 @@ const std::string ReplSetConfig::kMajorityWriteConcernModeName = "$majority"; const Milliseconds ReplSetConfig::kDefaultHeartbeatInterval(2000); const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10); const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000); -const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(2000); +const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(60000); const bool ReplSetConfig::kDefaultChainingAllowed(true); namespace { @@ -270,14 +271,14 @@ Status ReplSetConfig::_parseSettingsSubdocument(const BSONObj& settings) { // // Parse catchUpTimeoutMillis // - auto notLessThanZero = stdx::bind(std::greater_equal<long long>(), stdx::placeholders::_1, 0); + auto validCatchUpTimeout = [](long long timeout) { return timeout >= 0LL || timeout == -1LL; }; long long catchUpTimeoutMillis; Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf( settings, kCatchUpTimeoutFieldName, durationCount<Milliseconds>(kDefaultCatchUpTimeoutPeriod), - notLessThanZero, - "catch-up timeout must be greater than or equal to 0", + validCatchUpTimeout, + "catch-up timeout must be positive, 0 (no catch-up) or -1 (infinite catch-up).", &catchUpTimeoutMillis); if (!catchUpTimeoutStatus.isOK()) { return catchUpTimeoutStatus; diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index e44e51b12a7..63d5bcb80e8 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -59,6 +59,7 @@ public: static const size_t kMaxMembers = 50; static const size_t kMaxVotingMembers = 7; + static const Milliseconds kInfiniteCatchUpTimeout; static const Milliseconds kDefaultElectionTimeoutPeriod; static const Milliseconds kDefaultHeartbeatInterval; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index fd7f0df661d..8c7e87b1c8b 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -855,6 +855,12 @@ public: virtual Status stepUpIfEligible() = 0; + + /** + * Abort catchup if the node is in catchup mode. + */ + virtual Status abortCatchupIfNeeded() = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 8632eddc42a..5e1ee6d886d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -156,54 +156,45 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { return toBSON().toString(); } -struct ReplicationCoordinatorImpl::WaiterInfo { +ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) + : opTime(std::move(_opTime)), writeConcern(_writeConcern) {} - using FinishFunc = stdx::function<void()>; +BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const { + BSONObjBuilder bob; + bob.append("opTime", opTime.toBSON()); + if (writeConcern) { + bob.append("writeConcern", writeConcern->toBSON()); + } + return bob.obj(); +}; - WaiterInfo(unsigned int _opID, - const OpTime _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar) - : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {} +std::string ReplicationCoordinatorImpl::Waiter::toString() const { + return toBSON().toString(); +}; - // When waiter is signaled, finishCallback will be called while holding replCoord _mutex - // since WaiterLists are protected by _mutex. - WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback) - : opTime(_opTime), finishCallback(_finishCallback) {} - BSONObj toBSON() const { - BSONObjBuilder bob; - bob.append("opId", opID); - bob.append("opTime", opTime.toBSON()); - if (writeConcern) { - bob.append("writeConcern", writeConcern->toBSON()); - } - return bob.obj(); - }; +ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar) + : Waiter(_opTime, _writeConcern), condVar(_condVar) {} - std::string toString() const { - return toBSON().toString(); - }; +void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() { + invariant(condVar); + condVar->notify_all(); +} - // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex. - void notify() { - if (condVar) { - condVar->notify_all(); - } - if (finishCallback) { - finishCallback(); - } - } +ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime, + FinishFunc _finishCallback) + : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {} + +void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() { + invariant(finishCallback); + finishCallback(); +} - const unsigned int opID = 0; - const OpTime opTime; - const WriteConcernOptions* writeConcern = nullptr; - stdx::condition_variable* condVar = nullptr; - // The callback that will be called when this waiter is notified. - FinishFunc finishCallback = nullptr; -}; -struct ReplicationCoordinatorImpl::WaiterInfoGuard { +class ReplicationCoordinatorImpl::WaiterGuard { +public: /** * Constructor takes the list of waiters and enqueues itself on the list, removing itself * in the destructor. @@ -215,23 +206,17 @@ struct ReplicationCoordinatorImpl::WaiterInfoGuard { * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterInfoGuard(WaiterList* list, - unsigned int opID, - const OpTime opTime, - const WriteConcernOptions* writeConcern, - stdx::condition_variable* condVar) - : waiter(opID, opTime, writeConcern, condVar), _list(list) { - list->add_inlock(&waiter); + WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) { + list->add_inlock(_waiter); } - ~WaiterInfoGuard() { - _list->remove_inlock(&waiter); + ~WaiterGuard() { + _list->remove_inlock(_waiter); } - WaiterInfo waiter; - private: WaiterList* _list; + Waiter* _waiter; }; void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { @@ -240,33 +225,46 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock( stdx::function<bool(WaiterType)> func) { - std::vector<WaiterType>::iterator it = _list.end(); - while (true) { - it = std::find_if(_list.begin(), _list.end(), func); - if (it == _list.end()) { - break; + // Only advance iterator when the element doesn't match. + for (auto it = _list.begin(); it != _list.end();) { + if (!func(*it)) { + ++it; + continue; } - (*it)->notify(); - std::swap(*it, _list.back()); - _list.pop_back(); + + WaiterType waiter = std::move(*it); + if (it == std::prev(_list.end())) { + // Iterator will be invalid after erasing the last element, so set it to the + // next one (i.e. end()). + it = _list.erase(it); + } else { + // Iterator is still valid after pop_back(). + std::swap(*it, _list.back()); + _list.pop_back(); + } + + // It's important to call notify() after the waiter has been removed from the list + // since notify() might remove the waiter itself. + waiter->notify_inlock(); } } void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() { - for (auto& waiter : _list) { - waiter->notify(); + std::vector<WaiterType> list = std::move(_list); + // Call notify() after removing the waiters from the list. + for (auto& waiter : list) { + waiter->notify_inlock(); } - _list.clear(); } bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) { auto it = std::find(_list.begin(), _list.end(), waiter); - if (it != _list.end()) { - std::swap(*it, _list.back()); - _list.pop_back(); - return true; + if (it == _list.end()) { + return false; } - return false; + std::swap(*it, _list.back()); + _list.pop_back(); + return true; } namespace { @@ -1229,7 +1227,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); _opTimeWaiterList.signalAndRemoveIf_inlock( - [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; }); + [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -1329,11 +1327,11 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn, // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - WaiterInfoGuard waitInfo( - &_opTimeWaiterList, txn->getOpID(), targetOpTime, nullptr, &condVar); + ThreadWaiter waiter(targetOpTime, nullptr, &condVar); + WaiterGuard guard(&_opTimeWaiterList, &waiter); - LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until " - << txn->getDeadline(); + LOG(3) << "waituntilOpTime: OpID " << txn->getOpID() << " is waiting for OpTime " << waiter + << " until " << txn->getDeadline(); auto waitStatus = txn->waitForConditionOrInterruptNoAssert(condVar, lock); if (!waitStatus.isOK()) { @@ -1720,8 +1718,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; - WaiterInfoGuard waitInfo( - &_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar); + ThreadWaiter waiter(opTime, &writeConcern, &condVar); + WaiterGuard guard(&_replicationWaiterList, &waiter); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { if (_inShutdown) { @@ -1739,7 +1737,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( BSONObjBuilder progress; _appendSlaveInfoData_inlock(&progress); log() << "Replication for failed WC: " << writeConcern.toBSON() - << ", waitInfo:" << waitInfo.waiter.toBSON() + << ", waitInfo: " << waiter << ", opID: " << txn->getOpID() << ", progress: " << progress.done(); } return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; @@ -2603,8 +2601,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { result = kActionFollowerModeStateChange; } - // Enable replication producer and applier on stepdown. + // Exit catchup mode if we're in it and enable replication producer and applier on stepdown. if (_memberState.primary()) { + if (_catchupState) { + _catchupState->abort_inlock(); + } _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -2706,13 +2707,18 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( invariant(nextAction != kActionWinElection); lk.unlock(); _performPostMemberStateUpdateAction(nextAction); - // Notify all secondaries of the election win. - _scheduleElectionWinNotification(); lk.lock(); + if (!_getMemberState_inlock().primary()) { + break; + } + // Notify all secondaries of the election win. + _restartHeartbeats_inlock(); if (isV1ElectionProtocol()) { - _scanOpTimeForCatchUp_inlock(); + invariant(!_catchupState); + _catchupState = stdx::make_unique<CatchupState>(this); + _catchupState->start_inlock(); } else { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); } break; } @@ -2727,13 +2733,114 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } } +void ReplicationCoordinatorImpl::CatchupState::start_inlock() { + log() << "Entering primary catch-up mode."; + + // No catchup in single node replica set. + if (_repl->_rsConfig.getNumMembers() == 1) { + abort_inlock(); + return; + } + + auto timeoutCB = [this](const CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + log() << "Catchup timed out after becoming primary."; + stdx::lock_guard<stdx::mutex> lk(_repl->_mutex); + abort_inlock(); + }; + + // Schedule timeout callback. + auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod(); + // Deal with infinity and overflow - no timeout. + if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout || + Date_t::max() - _repl->_replExecutor.now() <= catchupTimeout) { + return; + } + auto timeoutDate = _repl->_replExecutor.now() + catchupTimeout; + auto status = _repl->_replExecutor.scheduleWorkAt(timeoutDate, timeoutCB); + if (!status.isOK()) { + log() << "Failed to schedule catchup timeout work."; + abort_inlock(); + return; + } + _timeoutCbh = status.getValue(); +} + +void ReplicationCoordinatorImpl::CatchupState::abort_inlock() { + invariant(_repl->_getMemberState_inlock().primary()); + + log() << "Exited primary catch-up mode."; + // Clean up its own members. + if (_timeoutCbh) { + _repl->_replExecutor.cancel(_timeoutCbh); + } + if (_waiter) { + _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); + } + + // Enter primary drain mode. + _repl->_enterDrainMode_inlock(); + // Destruct the state itself. + _repl->_catchupState.reset(nullptr); +} + +void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() { + auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart(); + // Haven't collected all heartbeat responses. + if (!targetOpTime) { + return; + } + + // We've caught up. + if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { + log() << "Caught up to the latest known optime via heartbeats after becoming primary."; + abort_inlock(); + return; + } + + // Reset the target optime if it has changed. + if (_waiter && _waiter->opTime == *targetOpTime) { + return; + } + + log() << "Heartbeats updated catchup target optime to " << *targetOpTime; + if (_waiter) { + _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); + } + auto targetOpTimeCB = [this, targetOpTime]() { + // Double check the target time since stepdown may signal us too. + if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { + log() << "Caught up to the latest known optime successfully after becoming primary."; + abort_inlock(); + } + }; + _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB); + _repl->_opTimeWaiterList.add_inlock(_waiter.get()); +} + +Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() { + if (!isV1ElectionProtocol()) { + return Status(ErrorCodes::CommandNotSupported, + "Primary catch-up is only supported by Protocol Version 1"); + } + + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_catchupState) { + _catchupState->abort_inlock(); + return Status::OK(); + } + return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode."); +} + void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto scanner = std::make_shared<FreshnessScanner>(); auto scanStartTime = _replExecutor.now(); auto evhStatus = scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2742,7 +2849,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { LockGuard lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2771,7 +2878,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Could not access any nodes within timeout when checking for " << "additional ops to apply before finishing transition to primary. " << "Will move forward with becoming primary anyway."; - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } @@ -2780,7 +2887,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { log() << "My optime is most up-to-date, skipping catch-up " << "and completing transition to primary."; - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); return; } @@ -2793,9 +2900,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Finished catch-up oplog after becoming primary."; } - _finishCatchingUpOplog_inlock(); + _enterDrainMode_inlock(); }; - auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB); + auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB); _opTimeWaiterList.add_inlock(waiterInfo.get()); auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { @@ -2808,7 +2915,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca _replExecutor.scheduleWorkAt(_replExecutor.now() + timeout, timeoutCB); } -void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { +void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); } @@ -2898,7 +3005,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC } void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { - _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) { + _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) { return _doneWaitingForReplication_inlock( waiter->opTime, SnapshotName::min(), *waiter->writeConcern); }); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index a2b97ae9ce5..cc72381f746 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -326,6 +326,8 @@ public: virtual Status stepUpIfEligible() override; + virtual Status abortCatchupIfNeeded() override; + // ================== Test support API =================== /** @@ -483,16 +485,55 @@ private: kActionStartSingleNodeElection }; - // Struct that holds information about clients waiting for replication. - struct WaiterInfo; - struct WaiterInfoGuard; + // Abstract struct that holds information about clients waiting for replication. + // Subclasses need to define how to notify them. + struct Waiter { + Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern); + virtual ~Waiter() = default; + + BSONObj toBSON() const; + std::string toString() const; + // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex. + virtual void notify_inlock() = 0; + + const OpTime opTime; + const WriteConcernOptions* writeConcern = nullptr; + }; + + // When ThreadWaiter gets notified, it will signal the conditional variable. + // + // This is used when a thread wants to block inline until the opTime is reached with the given + // writeConcern. + struct ThreadWaiter : public Waiter { + ThreadWaiter(OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar); + void notify_inlock() override; + + stdx::condition_variable* condVar = nullptr; + }; + + // When the waiter is notified, finishCallback will be called while holding replCoord _mutex + // since WaiterLists are protected by _mutex. + // + // This is used when we want to run a callback when the opTime is reached. + struct CallbackWaiter : public Waiter { + using FinishFunc = stdx::function<void()>; + + CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback); + void notify_inlock() override; + + // The callback that will be called when this waiter is notified. + FinishFunc finishCallback = nullptr; + }; + + class WaiterGuard; class WaiterList { public: - using WaiterType = WaiterInfo*; + using WaiterType = Waiter*; - // Adds waiter into the list. Usually, the waiter will be signaled only once and then - // removed. + // Adds waiter into the list. void add_inlock(WaiterType waiter); // Returns whether waiter is found and removed. bool remove_inlock(WaiterType waiter); @@ -529,6 +570,44 @@ private: typedef std::vector<ReplicationExecutor::CallbackHandle> HeartbeatHandles; + // The state and logic of primary catchup. + // + // When start() is called, CatchupState will schedule the timeout callback. When we get + // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is + // set. + // The primary exits catchup mode when any of the following happens. + // 1) My last applied optime reaches the target optime, if we've received a heartbeat from all + // nodes. + // 2) Catchup timeout expires. + // 3) Primary steps down. + // 4) The primary has to roll back to catch up. + // 5) The primary is too stale to catch up. + // + // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the + // life cycle of the state object aligns with the conceptual state. + // In shutdown, the timeout callback will be canceled by the executor and the state is safe to + // destroy. + // + // Any function of the state must be called while holding _mutex. + class CatchupState { + public: + CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {} + // start() can only be called once. + void start_inlock(); + // Reset the state itself to destruct the state. + void abort_inlock(); + // Heartbeat calls this function to update the target optime. + void signalHeartbeatUpdate_inlock(); + + private: + ReplicationCoordinatorImpl* _repl; // Not owned. + // Callback handle used to cancel a scheduled catchup timeout callback. + ReplicationExecutor::CallbackHandle _timeoutCbh; + // Handle to a Waiter that contains the current target optime to reach after which + // we can exit catchup mode. + std::unique_ptr<CallbackWaiter> _waiter; + }; + /** * Appends a "replicationProgress" section with data for each member in set. */ @@ -1158,7 +1237,7 @@ private: /** * Finish catch-up mode and start drain mode. */ - void _finishCatchingUpOplog_inlock(); + void _enterDrainMode_inlock(); /** * Waits for the config state to leave kConfigStartingUp, which indicates that start() has @@ -1398,6 +1477,10 @@ private: mutable stdx::mutex _indexPrefetchMutex; ReplSettings::IndexPrefetchConfig _indexPrefetchConfig = ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I) + + // The catchup state including all catchup logic. The presence of a non-null pointer indicates + // that the node is currently in catchup mode. + std::unique_ptr<CatchupState> _catchupState; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index e5fa8aa73d1..b8b32295193 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -159,7 +159,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - simulateCatchUpTimeout(); + simulateCatchUpAbort(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto txnPtr = makeOperationContext(); @@ -222,8 +222,6 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - // Wait for catchup check to finish. - simulateCatchUpTimeout(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto txnPtr = makeOperationContext(); @@ -1276,15 +1274,19 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) class PrimaryCatchUpTest : public ReplCoordTest { protected: using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; - using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>; + using NetworkRequestFn = stdx::function<void(const NetworkOpIter)>; - void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) { + const Timestamp smallTimestamp{1, 1}; + + ResponseStatus makeHeartbeatResponse(OpTime opTime) { ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest(); ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true))); + hbResp.setAppliedOpTime(opTime); + hbResp.setDurableOpTime(opTime); + return makeResponseStatus(hbResp.toBSON(true)); } void simulateSuccessfulV1Voting() { @@ -1296,10 +1298,9 @@ protected: log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)"; ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); - bool hasReadyRequests = true; - // Process requests until we're primary and consume the heartbeats for the notification + // Process requests until we're primary but leave the heartbeats for the notification // of election win. Exit immediately on unexpected requests. - while (!replCoord->getMemberState().primary() || hasReadyRequests) { + while (!replCoord->getMemberState().primary()) { log() << "Waiting on network in state " << replCoord->getMemberState(); net->enterNetwork(); if (net->now() < electionTimeoutWhen) { @@ -1310,7 +1311,9 @@ protected: const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { - replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); + OpTime opTime(Timestamp(), getReplCoord()->getTerm()); + net->scheduleResponse( + net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime)); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { net->scheduleResponse(net->getNextReadyRequest(), @@ -1332,12 +1335,11 @@ protected: // executor. getReplExec()->waitForDBWork_forTest(); net->runReadyNetworkOperations(); - hasReadyRequests = net->hasReadyRequests(); net->exitNetwork(); } } - ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) { + ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, bool infiniteTimeout = false) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -1352,7 +1354,8 @@ protected: << "protocolVersion" << 1 << "settings" - << BSON("catchUpTimeoutMillis" << 5000)); + << BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis" + << (infiniteTimeout ? -1 : 5000))); assertStartSuccess(configObj, HostAndPort("node1", 12345)); ReplSetConfig config = assertMakeRSConfig(configObj); @@ -1374,17 +1377,15 @@ protected: return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON()))); } - void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) { + void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) { NetworkInterfaceMock* net = getNet(); net->enterNetwork(); while (net->hasReadyRequests()) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; - if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") { - onFreshnessScanRequest(noi); - } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { - replyToHeartbeatRequestAsSecondaries(noi); + if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { + onHeartbeatRequest(noi); } else { log() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; @@ -1395,7 +1396,8 @@ protected: net->exitNetwork(); } - void replyHeartbeatsAndRunUntil(Date_t until) { + // Response heartbeats with opTime until the given time. Exit if it sees any other request. + void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) { auto net = getNet(); net->enterNetwork(); while (net->now() < until) { @@ -1403,9 +1405,10 @@ protected: // Peek the next request auto noi = net->getFrontOfUnscheduledQueue(); auto& request = noi->getRequest(); + log() << request.target << " at " << net->now() << " processing " << request.cmdObj; if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { // Consume the next request - replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); + onHeartbeatRequest(net->getNextReadyRequest()); } else { // Cannot consume other requests than heartbeats. net->exitNetwork(); @@ -1416,122 +1419,148 @@ protected: } net->exitNetwork(); } + + // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal + // the optime waiter. + void advanceMyLastAppliedOpTime(OpTime opTime) { + getReplCoord()->setMyLastAppliedOpTime(opTime); + getNet()->enterNetwork(); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + } }; -TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) { +// The first round of heartbeats indicates we are the most up-to-date. +TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - processFreshnessScanRequests([this](const NetworkOpIter noi) { - getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); + int count = 0; + processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) { + count++; + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1)); }); + + // Get 2 heartbeats from secondaries. + ASSERT_EQUALS(2, count); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up")); - auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } -TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) { +// Heartbeats set a future target OpTime and we reached that successfully. +TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this](const NetworkOpIter noi) { - auto request = noi->getRequest(); - log() << "Black holing request to " << request.target << ": " << request.cmdObj; - getNet()->blackHole(noi); + processHeartbeatRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); - - auto net = getNet(); - replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod()); - ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + advanceMyLastAppliedOpTime(time2); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout")); - auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); + ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } -TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) { +TEST_F(PrimaryCatchUpTest, CatchupTimeout) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); + replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) { + // Other nodes are ahead of me. + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); - - NetworkInterfaceMock* net = getNet(); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - // Simulate the work done by bgsync and applier threads. - // setMyLastAppliedOpTime() will signal the optime waiter. - getReplCoord()->setMyLastAppliedOpTime(time2); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); - auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); + ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } -TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) { +TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); - OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - // The new primary learns of the latest OpTime. - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + // We should get caught up by the timeout time. + auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); + replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host"); + getNet()->scheduleResponse(noi, getNet()->now(), status); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); + } }); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); +TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + // We should get caught up by the timeout time. + auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); + replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + log() << "Black holing heartbeat from " << request.target.host(); + getNet()->blackHole(noi); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); + } + }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); - auto txn = makeOperationContext(); - getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); } -TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { +TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto request = noi->getRequest(); - log() << "Black holing request to " << request.target << ": " << request.cmdObj; - getNet()->blackHole(noi); - }); + // Step down immediately. ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); + ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); + ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); } @@ -1541,30 +1570,24 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + // Step down in the middle of catchup. + auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2; + replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) { + // Other nodes are ahead of me. + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - auto net = getNet(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - auto txn = makeOperationContext(); - // Simulate the applier signaling replCoord to exit drain mode. - // At this point, we see the stepdown and reset the states. - getReplCoord()->signalDrainComplete(txn.get(), getReplCoord()->getTerm()); + // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); + ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); + ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test")); } @@ -1575,24 +1598,17 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + processHeartbeatRequests([this, time2](const NetworkOpIter noi) { auto net = getNet(); // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); - - NetworkInterfaceMock* net = getNet(); ReplicationCoordinatorImpl* replCoord = getReplCoord(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - // Simulate the work done by bgsync and applier threads. - // setMyLastAppliedOpTime() will signal the optime waiter. - replCoord->setMyLastAppliedOpTime(time2); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); + advanceMyLastAppliedOpTime(time2); ASSERT(replCoord->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); + ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime")); // Step down during drain mode. TopologyCoordinator::UpdateTermResult updateTermResult; @@ -1606,9 +1622,10 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { simulateSuccessfulV1Voting(); ASSERT_TRUE(replCoord->getMemberState().primary()); - // No need to catch-up, so we enter drain mode. - processFreshnessScanRequests([this](const NetworkOpIter noi) { - getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); + // No need to catch up, so we enter drain mode. + processHeartbeatRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); ASSERT(replCoord->getApplierState() == ApplierState::Draining); ASSERT_FALSE(replCoord->canAcceptWritesForDatabase("test")); @@ -1618,6 +1635,111 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { ASSERT_TRUE(replCoord->canAcceptWritesForDatabase("test")); } +TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(200, 1), 0); + OpTime time3(Timestamp(300, 1), 0); + OpTime time4(Timestamp(400, 1), 0); + + // 1) The primary is at time 1 at the beginning. + ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); + + // 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available. + // So the target optime is time 3. + startCapturingLogMessages(); + auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3; + replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host"); + getNet()->scheduleResponse(noi, getNet()->now(), status); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3)); + } + }); + // The node is still in catchup mode, but the target optime has been set. + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); + + // 3) Advancing its applied optime to time 2 isn't enough. + advanceMyLastAppliedOpTime(time2); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + + // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target. + startCapturingLogMessages(); + auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3; + replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) { + const RemoteCommandRequest& request = noi->getRequest(); + if (request.target.host() == "node2") { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4)); + } else { + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3)); + } + }); + // The node is still in catchup mode, but the target optime has been updated. + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); + + // 5) Advancing to time 3 isn't enough now. + advanceMyLastAppliedOpTime(time3); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + + // 6) The node catches up time 4 eventually. + startCapturingLogMessages(); + advanceMyLastAppliedOpTime(time4); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + stopCapturingLogMessages(); + ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + +TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) { + startCapturingLogMessages(); + + OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); + ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, true); + + // Run time far forward and ensure we are still in catchup mode. + // This is an arbitrary time 'far' into the future. + auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10; + replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) { + // Other nodes are ahead of me. + getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); + + // Simulate the heartbeats from secondaries to primary to update liveness info. + // TODO(sz): Remove this after merging liveness info and heartbeats. + const RemoteCommandRequest& request = noi->getRequest(); + ReplSetHeartbeatArgsV1 hbArgs; + hbArgs.setConfigVersion(config.getConfigVersion()); + hbArgs.setSetName(config.getReplSetName()); + hbArgs.setSenderHost(request.target); + hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId()); + hbArgs.setTerm(getReplCoord()->getTerm()); + ASSERT(hbArgs.isInitialized()); + ReplSetHeartbeatResponse response; + ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); + }); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + + // Simulate a user initiated abort. + ASSERT_OK(getReplCoord()->abortCatchupIfNeeded()); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); + ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase("test")); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index dfff8f92030..b72d7e7dbcc 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -231,6 +231,13 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down. _signalStepDownWaiter_inlock(); + { + LockGuard lk(_mutex); + // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing. + if (_catchupState) { + _catchupState->signalHeartbeatUpdate_inlock(); + } + } _scheduleHeartbeatToTarget( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); @@ -673,6 +680,9 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { } _scheduleHeartbeatToTarget(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); } + + _topCoord->restartHeartbeats(); + if (isV1ElectionProtocol()) { for (auto&& slaveInfo : _slaveInfo) { slaveInfo.lastUpdate = _replExecutor.now(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index b44dc3b4e40..6ee1b399cdd 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -107,11 +107,6 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn, replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); replCoord->waitForElectionFinish_forTest(); - // Wait for primary catch-up - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); @@ -5057,7 +5052,6 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { // Single node cluster - this node should start election on setFollowerMode() completion. replCoord->waitForElectionFinish_forTest(); - simulateCatchUpTimeout(); // Successful dry run election increases term. ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index f6ee65745fc..0650bf4f886 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -473,5 +473,9 @@ void ReplicationCoordinatorMock::alwaysAllowWrites(bool allowWrites) { _alwaysAllowWrites = allowWrites; } +Status ReplicationCoordinatorMock::abortCatchupIfNeeded() { + return Status::OK(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index c75702e73fa..d7d7f8e1d6b 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -273,6 +273,8 @@ public: */ void alwaysAllowWrites(bool allowWrites); + virtual Status abortCatchupIfNeeded() override; + private: AtomicUInt64 _snapshotNameGenerator; const ReplSettings _settings; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index ffc64712d8a..8c5b821b5b6 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -312,6 +312,10 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); + // The smallest valid optime in PV1. + OpTime opTime(Timestamp(), 0); + hbResp.setAppliedOpTime(opTime); + hbResp.setDurableOpTime(opTime); hbResp.setConfigVersion(rsConfig.getConfigVersion()); net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true))); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { @@ -477,32 +481,30 @@ void ReplCoordTest::disableSnapshots() { _externalState->setAreSnapshotsEnabled(false); } -void ReplCoordTest::simulateCatchUpTimeout() { +void ReplCoordTest::simulateCatchUpAbort() { NetworkInterfaceMock* net = getNet(); - auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod(); + auto heartbeatTimeoutWhen = + net->now() + getReplCoord()->getConfig().getHeartbeatTimeoutPeriodMillis(); bool hasRequest = false; net->enterNetwork(); - if (net->now() < catchUpTimeoutWhen) { - net->runUntil(catchUpTimeoutWhen); + if (net->now() < heartbeatTimeoutWhen) { + net->runUntil(heartbeatTimeoutWhen); } hasRequest = net->hasReadyRequests(); - net->exitNetwork(); - while (hasRequest) { - net->enterNetwork(); auto noi = net->getNextReadyRequest(); auto request = noi->getRequest(); // Black hole heartbeat requests caused by time advance. log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj; net->blackHole(noi); - if (net->now() < catchUpTimeoutWhen) { - net->runUntil(catchUpTimeoutWhen); + if (net->now() < heartbeatTimeoutWhen) { + net->runUntil(heartbeatTimeoutWhen); } else { net->runReadyNetworkOperations(); } hasRequest = net->hasReadyRequests(); - net->exitNetwork(); } + net->exitNetwork(); } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 6fe8b14803e..8ea48d3d32e 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -266,9 +266,9 @@ protected: void disableSnapshots(); /** - * Timeout all freshness scan request for primary catch-up. + * Timeout all heartbeat requests for primary catch-up. */ - void simulateCatchUpTimeout(); + void simulateCatchUpAbort(); private: std::unique_ptr<ReplicationCoordinatorImpl> _repl; diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 01ed33cb168..cddc8a4e8e2 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -495,6 +495,21 @@ public: */ virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0; + /** + * Reset the booleans to record the last heartbeat restart. + */ + virtual void restartHeartbeats() = 0; + + /** + * Scans through all members that are 'up' and return the latest known optime, if we have + * received (successful or failed) heartbeats from all nodes since heartbeat restart. + * + * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted + * heartbeats. + * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down. + */ + virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const = 0; + protected: TopologyCoordinator() {} }; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 732dedf8742..d277fdfb14d 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2645,5 +2645,35 @@ void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool support supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } +void TopologyCoordinatorImpl::restartHeartbeats() { + for (auto& hb : _hbdata) { + hb.restart(); + } +} + +boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const { + // The smallest OpTime in PV1. + OpTime latest(Timestamp(0, 0), 0); + for (size_t i = 0; i < _hbdata.size(); i++) { + auto& peer = _hbdata[i]; + + if (static_cast<int>(i) == _selfIndex) { + continue; + } + // If any heartbeat is not fresh enough, return none. + if (!peer.isUpdatedSinceRestart()) { + return boost::none; + } + // Ignore down members + if (!peer.up()) { + continue; + } + if (peer.getAppliedOpTime() > latest) { + latest = peer.getAppliedOpTime(); + } + } + return latest; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index bbf4562604d..6a32dd8131c 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -245,6 +245,10 @@ public: bool isPriorityTakeover); virtual void setStorageEngineSupportsReadCommitted(bool supported); + virtual void restartHeartbeats(); + + virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; + //////////////////////////////////////////////////////////// // // Test support methods diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index 667cef8c430..fe345e13d69 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -864,9 +864,11 @@ var ReplSetTest = function(opts) { }; this.reInitiate = function() { - var config = this.getReplSetConfig(); - var newVersion = this.getReplSetConfigFromNode().version + 1; - config.version = newVersion; + var config = this.getReplSetConfigFromNode(); + var newConfig = this.getReplSetConfig(); + // Only reset members. + config.members = newConfig.members; + config.version += 1; this._setDefaultConfigOptions(config); |