diff options
author | Robert Guo <robert.guo@10gen.com> | 2017-04-20 10:58:37 -0400 |
---|---|---|
committer | Robert Guo <robert.guo@10gen.com> | 2017-04-20 10:58:57 -0400 |
commit | c08590a6ac9dc54c9d910822d47ea17140b56f89 (patch) | |
tree | 9a6986057f4453f858fac87d43b7435e56f5e807 | |
parent | fac33fe5a6814169c9c6131d80f1b325c74647da (diff) | |
download | mongo-c08590a6ac9dc54c9d910822d47ea17140b56f89.tar.gz |
Revert "SERVER-26848 Exit catchup mode when not syncing more data."
This reverts commit d0c851e2f4bfea514e22c97af1838640d2849a8c.
24 files changed, 330 insertions, 820 deletions
diff --git a/jstests/multiVersion/downgrade_replset.js b/jstests/multiVersion/downgrade_replset.js index 022471410a1..658b35813a2 100644 --- a/jstests/multiVersion/downgrade_replset.js +++ b/jstests/multiVersion/downgrade_replset.js @@ -14,45 +14,38 @@ var nodes = { n3: {binVersion: newVersion} }; -function runDowngradeTest(protocolVersion) { - var rst = new ReplSetTest({name: name, nodes: nodes}); - rst.startSet(); - var replSetConfig = rst.getReplSetConfig(); - replSetConfig.protocolVersion = protocolVersion; - // Hard-code catchup timeout to be compatible with 3.4 - replSetConfig.settings = {catchUpTimeoutMillis: 2000}; - rst.initiate(replSetConfig); - - var primary = rst.getPrimary(); - var coll = "test.foo"; - - jsTest.log("Inserting documents into collection."); - for (var i = 0; i < 10; i++) { - primary.getCollection(coll).insert({_id: i, str: "hello world"}); - } +var rst = new ReplSetTest({name: name, nodes: nodes}); +rst.startSet(); +var replSetConfig = rst.getReplSetConfig(); +replSetConfig.protocolVersion = 0; +rst.initiate(replSetConfig); + +var primary = rst.getPrimary(); +var coll = "test.foo"; + +jsTest.log("Inserting documents into collection."); +for (var i = 0; i < 10; i++) { + primary.getCollection(coll).insert({_id: i, str: "hello world"}); +} - function insertDocuments(rsURL, coll) { - var coll = new Mongo(rsURL).getCollection(coll); - var count = 10; - while (!isFinished()) { - assert.writeOK(coll.insert({_id: count, str: "hello world"})); - count++; - } +function insertDocuments(rsURL, coll) { + var coll = new Mongo(rsURL).getCollection(coll); + var count = 10; + while (!isFinished()) { + assert.writeOK(coll.insert({_id: count, str: "hello world"})); + count++; } +} - jsTest.log("Starting parallel operations during downgrade.."); - var joinFindInsert = startParallelOps(primary, insertDocuments, [rst.getURL(), coll]); - - jsTest.log("Downgrading replica set.."); - rst.upgradeSet({binVersion: oldVersion}); - jsTest.log("Downgrade complete."); +jsTest.log("Starting parallel operations during downgrade.."); +var joinFindInsert = startParallelOps(primary, insertDocuments, [rst.getURL(), coll]); - primary = rst.getPrimary(); - printjson(rst.status()); +jsTest.log("Downgrading replica set.."); +rst.upgradeSet({binVersion: oldVersion}); +jsTest.log("Downgrade complete."); - joinFindInsert(); - rst.stopSet(); -} +primary = rst.getPrimary(); +printjson(rst.status()); -runDowngradeTest(0); -runDowngradeTest(1); +joinFindInsert(); +rst.stopSet(); diff --git a/jstests/multiVersion/initialsync.js b/jstests/multiVersion/initialsync.js index a36d538a6f8..d8c1d629fd0 100644 --- a/jstests/multiVersion/initialsync.js +++ b/jstests/multiVersion/initialsync.js @@ -7,15 +7,13 @@ var newVersion = "latest"; var name = "multiversioninitsync"; -var multitest = function(replSetVersion, newNodeVersion, configSettings) { +var multitest = function(replSetVersion, newNodeVersion) { var nodes = {n1: {binVersion: replSetVersion}, n2: {binVersion: replSetVersion}}; print("Start up a two-node " + replSetVersion + " replica set."); var rst = new ReplSetTest({name: name, nodes: nodes}); rst.startSet(); - var conf = rst.getReplSetConfig(); - conf.settings = configSettings; - rst.initiate(conf); + rst.initiate(); // Wait for a primary node. var primary = rst.getPrimary(); @@ -52,5 +50,4 @@ multitest(oldVersion, newVersion); // Old Secondary is synced from a "latest" // version ReplSet. // ***************************************** -// Hard-code catchup timeout. The default timeout on 3.5 is -1, which is invalid on 3.4. -multitest(newVersion, oldVersion, {catchUpTimeoutMillis: 2000}); +multitest(newVersion, oldVersion); diff --git a/jstests/replsets/catchup.js b/jstests/replsets/catchup.js index 51632379463..542ad51c723 100644 --- a/jstests/replsets/catchup.js +++ b/jstests/replsets/catchup.js @@ -12,7 +12,6 @@ rst.startSet(); var conf = rst.getReplSetConfig(); - conf.members[2].priority = 0; conf.settings = { heartbeatIntervalMillis: 500, electionTimeoutMillis: 10000, @@ -35,7 +34,7 @@ node.adminCommand(verbosity); }); - function stepUpNode(node) { + function stepUp(node) { assert.soon(function() { node.adminCommand({replSetStepUp: 1}); return node.adminCommand('replSetGetStatus').myState == ReplSetTest.State.PRIMARY; @@ -44,6 +43,12 @@ return node; } + function doWrites(node) { + for (var i = 0; i < 3; i++) { + assert.writeOK(node.getDB("test").foo.insert({x: i})); + } + } + function checkOpInOplog(node, op, count) { node.getDB("admin").getMongo().setSlaveOk(); var oplog = node.getDB("local")['oplog.rs']; @@ -51,148 +56,98 @@ assert.eq(oplog.count(op), count, "op: " + tojson(op) + ", oplog: " + tojson(oplogArray)); } - // Stop replication on secondaries, do writes and step up one of the secondaries. - // - // The old primary has extra writes that are not replicated to the other nodes yet, - // but the new primary steps up, getting the vote from the the third node "voter". - function stopRelicationAndEnforceNewPrimaryToCatchUp() { - // Write documents that cannot be replicated to secondaries in time. - var oldSecondaries = rst.getSecondaries(); - var oldPrimary = rst.getPrimary(); - stopServerReplication(oldSecondaries); - for (var i = 0; i < 3; i++) { - assert.writeOK(oldPrimary.getDB("test").foo.insert({x: i})); + function isEarlierTimestamp(ts1, ts2) { + if (ts1.getTime() == ts2.getTime()) { + return ts1.getInc() < ts2.getInc(); } - var latestOpOnOldPrimary = getLatestOp(oldPrimary); - // New primary wins immediately, but needs to catch up. - var newPrimary = stepUpNode(oldSecondaries[0]); - rst.awaitNodesAgreeOnPrimary(); - var latestOpOnNewPrimary = getLatestOp(newPrimary); - // Check this node is not writable. - assert.eq(newPrimary.getDB("test").isMaster().ismaster, false); - - return { - oldSecondaries: oldSecondaries, - oldPrimary: oldPrimary, - newPrimary: newPrimary, - voter: oldSecondaries[1], - latestOpOnOldPrimary: latestOpOnOldPrimary, - latestOpOnNewPrimary: latestOpOnNewPrimary - }; - } - - function reconfigCatchUpTimeoutMillis(timeout) { - // Reconnect all nodes to make sure reconfig succeeds. - rst.nodes.forEach(reconnect); - // Reconfigure replicaset to decrease catchup timeout - conf = rst.getReplSetConfigFromNode(); - conf.version++; - conf.settings.catchUpTimeoutMillis = timeout; - reconfig(rst, conf); - rst.awaitReplication(); - rst.awaitNodesAgreeOnPrimary(); + return ts1.getTime() < ts2.getTime(); } - rst.awaitReplication(); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); - jsTest.log("Case 1: The primary is up-to-date after refreshing heartbeats."); + jsTest.log("Case 1: The primary is up-to-date after freshness scan."); // Should complete transition to primary immediately. - var newPrimary = stepUpNode(rst.getSecondary()); + var newPrimary = stepUp(rst.getSecondary()); rst.awaitNodesAgreeOnPrimary(); // Should win an election and finish the transition very quickly. assert.eq(newPrimary, rst.getPrimary()); - rst.awaitReplication(); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); jsTest.log("Case 2: The primary needs to catch up, succeeds in time."); - var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); - + // Write documents that cannot be replicated to secondaries in time. + var originalSecondaries = rst.getSecondaries(); + stopServerReplication(originalSecondaries); + doWrites(rst.getPrimary()); + var latestOp = getLatestOp(rst.getPrimary()); + // New primary wins immediately, but needs to catch up. + newPrimary = stepUp(rst.getSecondary()); + rst.awaitNodesAgreeOnPrimary(); + // Check this node is not writable. + assert.eq(newPrimary.getDB("test").isMaster().ismaster, false); // Disable fail point to allow replication. - restartServerReplication(stepUpResults.oldSecondaries); + restartServerReplication(originalSecondaries); // getPrimary() blocks until the primary finishes drain mode. - assert.eq(stepUpResults.newPrimary, rst.getPrimary()); + assert.eq(newPrimary, rst.getPrimary()); // Wait for all secondaries to catch up rst.awaitReplication(); // Check the latest op on old primary is preserved on the new one. - checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1); - rst.awaitReplication(); + checkOpInOplog(newPrimary, latestOp, 1); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); jsTest.log("Case 3: The primary needs to catch up, but has to change sync source to catch up."); - stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); - - // Disable fail point on the voter. Wait until it catches up with the old primary. - restartServerReplication(stepUpResults.voter); - assert.commandWorked( - stepUpResults.voter.adminCommand({replSetSyncFrom: stepUpResults.oldPrimary.host})); - awaitOpTime(stepUpResults.voter, stepUpResults.latestOpOnOldPrimary.ts); + // Write documents that cannot be replicated to secondaries in time. + stopServerReplication(rst.getSecondaries()); + doWrites(rst.getPrimary()); + var oldPrimary = rst.getPrimary(); + originalSecondaries = rst.getSecondaries(); + latestOp = getLatestOp(oldPrimary); + newPrimary = stepUp(originalSecondaries[0]); + rst.awaitNodesAgreeOnPrimary(); + // Disable fail point on one of the other secondaries. + // Wait until it catches up with the old primary. + restartServerReplication(originalSecondaries[1]); + assert.commandWorked(originalSecondaries[1].adminCommand({replSetSyncFrom: oldPrimary.host})); + awaitOpTime(originalSecondaries[1], latestOp.ts); // Disconnect the new primary and the old one. - stepUpResults.oldPrimary.disconnect(stepUpResults.newPrimary); + oldPrimary.disconnect(newPrimary); // Disable the failpoint, the new primary should sync from the other secondary. - restartServerReplication(stepUpResults.newPrimary); - assert.eq(stepUpResults.newPrimary, rst.getPrimary()); - checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1); + restartServerReplication(newPrimary); + assert.eq(newPrimary, rst.getPrimary()); + checkOpInOplog(newPrimary, latestOp, 1); // Restore the broken connection - stepUpResults.oldPrimary.reconnect(stepUpResults.newPrimary); - rst.awaitReplication(); + oldPrimary.reconnect(newPrimary); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); jsTest.log("Case 4: The primary needs to catch up, fails due to timeout."); - reconfigCatchUpTimeoutMillis(10 * 1000); - - stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); - // Wait until the new primary completes the transition to primary and writes a no-op. - checkLog.contains(stepUpResults.newPrimary, "Catchup timed out after becoming primary"); - restartServerReplication(stepUpResults.newPrimary); - assert.eq(stepUpResults.newPrimary, rst.getPrimary()); - - // Wait for the no-op "new primary" after winning an election, so that we know it has - // finished transition to primary. - assert.soon(function() { - return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary, - getLatestOp(stepUpResults.newPrimary)) < 0; - }); - // The extra oplog entries on the old primary are not replicated to the new one. - checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0); - restartServerReplication(stepUpResults.voter); - rst.awaitReplication(); + // Reconfigure replicaset to decrease catchup timeout + conf = rst.getReplSetConfigFromNode(); + conf.version++; + conf.settings.catchUpTimeoutMillis = 10 * 1000; + reconfig(rst, conf); + rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE); + rst.awaitNodesAgreeOnPrimary(); - jsTest.log("Case 5: The primary needs to catch up with no timeout, then gets aborted."); - reconfigCatchUpTimeoutMillis(-1); - stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); + // Write documents that cannot be replicated to secondaries in time. + originalSecondaries = rst.getSecondaries(); + stopServerReplication(originalSecondaries); + doWrites(rst.getPrimary()); + latestOp = getLatestOp(rst.getPrimary()); - // Abort catchup. - assert.commandWorked(stepUpResults.newPrimary.adminCommand({replSetAbortPrimaryCatchUp: 1})); + // New primary wins immediately, but needs to catch up. + newPrimary = stepUp(originalSecondaries[0]); + rst.awaitNodesAgreeOnPrimary(); + var latestOpOnNewPrimary = getLatestOp(newPrimary); + // Wait until the new primary completes the transition to primary and writes a no-op. + checkLog.contains(newPrimary, "Cannot catch up oplog after becoming primary"); + restartServerReplication(newPrimary); + assert.eq(newPrimary, rst.getPrimary()); // Wait for the no-op "new primary" after winning an election, so that we know it has // finished transition to primary. assert.soon(function() { - return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary, - getLatestOp(stepUpResults.newPrimary)) < 0; + return isEarlierTimestamp(latestOpOnNewPrimary.ts, getLatestOp(newPrimary).ts); }); // The extra oplog entries on the old primary are not replicated to the new one. - checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0); - restartServerReplication(stepUpResults.oldSecondaries); - rst.awaitReplication(); - checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0); - - // TODO: Uncomment case 6 when SERVER-28751 gets fixed. - // - // jsTest.log("Case 6: The primary needs to catch up with no timeout, but steps down."); - // var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp(); - - // // Step-down command should abort catchup. - // try { - // printjson(stepUpResults.newPrimary.adminCommand({replSetStepDown: 60})); - // } catch (e) { - // print(e); - // } - // // Rename the primary. - // var steppedDownPrimary = stepUpResults.newPrimary; - // var newPrimary = rst.getPrimary(); - // assert.neq(newPrimary, steppedDownPrimary); - - // // Enable data replication on the stepped down primary and make sure it syncs old writes. - // rst.nodes.forEach(reconnect); - // restartServerReplication(stepUpResults.oldSecondaries); - // rst.awaitReplication(); - // checkOpInOplog(steppedDownPrimary, stepUpResults.latestOpOnOldPrimary, 1); - + checkOpInOplog(newPrimary, latestOp, 0); + restartServerReplication(originalSecondaries[1]); })(); diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index 5911723d717..1471824bd8f 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -162,7 +162,6 @@ var getLastOpTime; if (!isNetworkError(e)) { throw e; } - print("Calling replSetReconfig failed. " + tojson(e)); } var master = rs.getPrimary().getDB("admin"); diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 9d129284de0..df792fa6fdc 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -313,12 +313,13 @@ void BackgroundSync::_produce(OperationContext* opCtx) { if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) { // All (accessible) sync sources were too stale. + // TODO: End catchup mode early if we are too stale. if (_replCoord->getMemberState().primary()) { warning() << "Too stale to catch up."; log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen << " from " << syncSourceResp.getSyncSource(); - _replCoord->abortCatchupIfNeeded(); + sleepsecs(1); return; } @@ -567,8 +568,9 @@ void BackgroundSync::_runRollback(OperationContext* opCtx, int requiredRBID, StorageInterface* storageInterface) { if (_replCoord->getMemberState().primary()) { - warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode."; - _replCoord->abortCatchupIfNeeded(); + // TODO: Abort catchup mode early if rollback detected. + warning() << "Rollback situation detected in catch-up mode; catch-up mode will end."; + sleepsecs(1); return; } diff --git a/src/mongo/db/repl/member_heartbeat_data.cpp b/src/mongo/db/repl/member_heartbeat_data.cpp index 1b9b9ea3f13..c267a6ba8ed 100644 --- a/src/mongo/db/repl/member_heartbeat_data.cpp +++ b/src/mongo/db/repl/member_heartbeat_data.cpp @@ -54,8 +54,6 @@ void MemberHeartbeatData::setUpValues(Date_t now, } _authIssue = false; _lastHeartbeat = now; - _updatedSinceRestart = true; - if (!hbResponse.hasState()) { hbResponse.setState(MemberState::RS_UNKNOWN); } @@ -79,7 +77,6 @@ void MemberHeartbeatData::setDownValues(Date_t now, const std::string& heartbeat _upSince = Date_t(); _lastHeartbeat = now; _authIssue = false; - _updatedSinceRestart = true; _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_DOWN); @@ -94,7 +91,6 @@ void MemberHeartbeatData::setAuthIssue(Date_t now) { _upSince = Date_t(); _lastHeartbeat = now; _authIssue = true; - _updatedSinceRestart = true; _lastResponse = ReplSetHeartbeatResponse(); _lastResponse.setState(MemberState::RS_UNKNOWN); diff --git a/src/mongo/db/repl/member_heartbeat_data.h b/src/mongo/db/repl/member_heartbeat_data.h index f67a0a87757..0f5dacd9081 100644 --- a/src/mongo/db/repl/member_heartbeat_data.h +++ b/src/mongo/db/repl/member_heartbeat_data.h @@ -123,17 +123,6 @@ public: */ void setAuthIssue(Date_t now); - /** - * Reset the boolean to record the last restart. - */ - void restart() { - _updatedSinceRestart = false; - } - - bool isUpdatedSinceRestart() const { - return _updatedSinceRestart; - } - private: // -1 = not checked yet, 0 = member is down/unreachable, 1 = member is up int _health; @@ -150,9 +139,6 @@ private: // The last heartbeat response we received. ReplSetHeartbeatResponse _lastResponse; - - // Have we received heartbeats since the last restart? - bool _updatedSinceRestart = false; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp index 09226ba0237..6348f72bfbd 100644 --- a/src/mongo/db/repl/repl_set_commands.cpp +++ b/src/mongo/db/repl/repl_set_commands.cpp @@ -868,7 +868,7 @@ public: status = getGlobalReplicationCoordinator()->stepUpIfEligible(); if (!status.isOK()) { - log() << "replSetStepUp request failed" << causedBy(status); + log() << "replSetStepUp request failed " << causedBy(status); } return appendCommandStatus(result, status); @@ -880,38 +880,5 @@ private: } } cmdReplSetStepUp; -class CmdReplSetAbortPrimaryCatchUp : public ReplSetCommand { -public: - virtual void help(stringstream& help) const { - help << "{ CmdReplSetAbortPrimaryCatchUp : 1 }\n"; - help << "Abort primary catch-up mode; immediately finish the transition to primary " - "without fetching any further unreplicated writes from any other online nodes"; - } - - CmdReplSetAbortPrimaryCatchUp() : ReplSetCommand("replSetAbortPrimaryCatchUp") {} - - virtual bool run(OperationContext* opCtx, - const string&, - BSONObj& cmdObj, - string& errmsg, - BSONObjBuilder& result) override { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); - if (!status.isOK()) - return appendCommandStatus(result, status); - log() << "Received replSetAbortPrimaryCatchUp request"; - - status = getGlobalReplicationCoordinator()->abortCatchupIfNeeded(); - if (!status.isOK()) { - log() << "replSetAbortPrimaryCatchUp request failed" << causedBy(status); - } - return appendCommandStatus(result, status); - } - -private: - ActionSet getAuthActionSet() const override { - return ActionSet{ActionType::replSetStateChange}; - } -} cmdReplSetAbortPrimaryCatchUp; - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index 2fbbd7cad52..f7140cc56bf 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -44,7 +44,6 @@ namespace repl { const size_t ReplSetConfig::kMaxMembers; const size_t ReplSetConfig::kMaxVotingMembers; -const Milliseconds ReplSetConfig::kInfiniteCatchUpTimeout(-1); const std::string ReplSetConfig::kConfigServerFieldName = "configsvr"; const std::string ReplSetConfig::kVersionFieldName = "version"; @@ -52,7 +51,7 @@ const std::string ReplSetConfig::kMajorityWriteConcernModeName = "$majority"; const Milliseconds ReplSetConfig::kDefaultHeartbeatInterval(2000); const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10); const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000); -const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(kInfiniteCatchUpTimeout); +const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(2000); const bool ReplSetConfig::kDefaultChainingAllowed(true); namespace { @@ -271,14 +270,14 @@ Status ReplSetConfig::_parseSettingsSubdocument(const BSONObj& settings) { // // Parse catchUpTimeoutMillis // - auto validCatchUpTimeout = [](long long timeout) { return timeout >= 0LL || timeout == -1LL; }; + auto notLessThanZero = stdx::bind(std::greater_equal<long long>(), stdx::placeholders::_1, 0); long long catchUpTimeoutMillis; Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf( settings, kCatchUpTimeoutFieldName, durationCount<Milliseconds>(kDefaultCatchUpTimeoutPeriod), - validCatchUpTimeout, - "catch-up timeout must be positive, 0 (no catch-up) or -1 (infinite catch-up).", + notLessThanZero, + "catch-up timeout must be greater than or equal to 0", &catchUpTimeoutMillis); if (!catchUpTimeoutStatus.isOK()) { return catchUpTimeoutStatus; diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index 63d5bcb80e8..e44e51b12a7 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -59,7 +59,6 @@ public: static const size_t kMaxMembers = 50; static const size_t kMaxVotingMembers = 7; - static const Milliseconds kInfiniteCatchUpTimeout; static const Milliseconds kDefaultElectionTimeoutPeriod; static const Milliseconds kDefaultHeartbeatInterval; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index c53f5a7b655..8446e718d30 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -880,11 +880,6 @@ public: virtual ServiceContext* getServiceContext() = 0; - /** - * Abort catchup if the node is in catchup mode. - */ - virtual Status abortCatchupIfNeeded() = 0; - protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 301f2c92c7c..f42e712ffd3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -155,45 +155,54 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const { return toBSON().toString(); } -ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern) - : opTime(std::move(_opTime)), writeConcern(_writeConcern) {} +struct ReplicationCoordinatorImpl::WaiterInfo { -BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const { - BSONObjBuilder bob; - bob.append("opTime", opTime.toBSON()); - if (writeConcern) { - bob.append("writeConcern", writeConcern->toBSON()); - } - return bob.obj(); -}; - -std::string ReplicationCoordinatorImpl::Waiter::toString() const { - return toBSON().toString(); -}; + using FinishFunc = stdx::function<void()>; + WaiterInfo(unsigned int _opID, + const OpTime _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar) + : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {} -ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar) - : Waiter(_opTime, _writeConcern), condVar(_condVar) {} + // When waiter is signaled, finishCallback will be called while holding replCoord _mutex + // since WaiterLists are protected by _mutex. + WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback) + : opTime(_opTime), finishCallback(_finishCallback) {} -void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() { - invariant(condVar); - condVar->notify_all(); -} + BSONObj toBSON() const { + BSONObjBuilder bob; + bob.append("opId", opID); + bob.append("opTime", opTime.toBSON()); + if (writeConcern) { + bob.append("writeConcern", writeConcern->toBSON()); + } + return bob.obj(); + }; -ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime, - FinishFunc _finishCallback) - : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {} + std::string toString() const { + return toBSON().toString(); + }; -void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() { - invariant(finishCallback); - finishCallback(); -} + // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex. + void notify() { + if (condVar) { + condVar->notify_all(); + } + if (finishCallback) { + finishCallback(); + } + } + const unsigned int opID = 0; + const OpTime opTime; + const WriteConcernOptions* writeConcern = nullptr; + stdx::condition_variable* condVar = nullptr; + // The callback that will be called when this waiter is notified. + FinishFunc finishCallback = nullptr; +}; -class ReplicationCoordinatorImpl::WaiterGuard { -public: +struct ReplicationCoordinatorImpl::WaiterInfoGuard { /** * Constructor takes the list of waiters and enqueues itself on the list, removing itself * in the destructor. @@ -205,17 +214,23 @@ public: * _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one * of these without holding _mutex */ - WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) { - list->add_inlock(_waiter); + WaiterInfoGuard(WaiterList* list, + unsigned int opID, + const OpTime opTime, + const WriteConcernOptions* writeConcern, + stdx::condition_variable* condVar) + : waiter(opID, opTime, writeConcern, condVar), _list(list) { + list->add_inlock(&waiter); } - ~WaiterGuard() { - _list->remove_inlock(_waiter); + ~WaiterInfoGuard() { + _list->remove_inlock(&waiter); } + WaiterInfo waiter; + private: WaiterList* _list; - Waiter* _waiter; }; void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { @@ -224,46 +239,33 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) { void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock( stdx::function<bool(WaiterType)> func) { - // Only advance iterator when the element doesn't match. - for (auto it = _list.begin(); it != _list.end();) { - if (!func(*it)) { - ++it; - continue; - } - - WaiterType waiter = std::move(*it); - if (it == std::prev(_list.end())) { - // Iterator will be invalid after erasing the last element, so set it to the - // next one (i.e. end()). - it = _list.erase(it); - } else { - // Iterator is still valid after pop_back(). - std::swap(*it, _list.back()); - _list.pop_back(); + std::vector<WaiterType>::iterator it = _list.end(); + while (true) { + it = std::find_if(_list.begin(), _list.end(), func); + if (it == _list.end()) { + break; } - - // It's important to call notify() after the waiter has been removed from the list - // since notify() might remove the waiter itself. - waiter->notify_inlock(); + (*it)->notify(); + std::swap(*it, _list.back()); + _list.pop_back(); } } void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() { - std::vector<WaiterType> list = std::move(_list); - // Call notify() after removing the waiters from the list. - for (auto& waiter : list) { - waiter->notify_inlock(); + for (auto& waiter : _list) { + waiter->notify(); } + _list.clear(); } bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) { auto it = std::find(_list.begin(), _list.end(), waiter); - if (it == _list.end()) { - return false; + if (it != _list.end()) { + std::swap(*it, _list.back()); + _list.pop_back(); + return true; } - std::swap(*it, _list.back()); - _list.pop_back(); - return true; + return false; } namespace { @@ -1184,7 +1186,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op _updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime); _opTimeWaiterList.signalAndRemoveIf_inlock( - [opTime](Waiter* waiter) { return waiter->opTime <= opTime; }); + [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; }); } void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime, @@ -1350,11 +1352,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - ThreadWaiter waiter(targetOpTime, nullptr, &condVar); - WaiterGuard guard(&_opTimeWaiterList, &waiter); + WaiterInfoGuard waitInfo( + &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar); - LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime " - << waiter << " until " << opCtx->getDeadline(); + LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until " + << opCtx->getDeadline(); auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock); if (!waitStatus.isOK()) { @@ -1742,8 +1744,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; - ThreadWaiter waiter(opTime, &writeConcern, &condVar); - WaiterGuard guard(&_replicationWaiterList, &waiter); + WaiterInfoGuard waitInfo( + &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { if (_inShutdown) { @@ -1761,7 +1763,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock( BSONObjBuilder progress; _appendSlaveInfoData_inlock(&progress); log() << "Replication for failed WC: " << writeConcern.toBSON() - << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID() + << ", waitInfo:" << waitInfo.waiter.toBSON() << ", progress: " << progress.done(); } return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; @@ -2658,11 +2660,8 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { result = kActionFollowerModeStateChange; } - // Exit catchup mode if we're in it and enable replication producer and applier on stepdown. + // Enable replication producer and applier on stepdown. if (_memberState.primary()) { - if (_catchupState) { - _catchupState->abort_inlock(); - } _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -2766,18 +2765,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( invariant(nextAction != kActionWinElection); lk.unlock(); _performPostMemberStateUpdateAction(nextAction); - lk.lock(); - if (!_getMemberState_inlock().primary()) { - break; - } // Notify all secondaries of the election win. - _restartHeartbeats_inlock(); + lk.lock(); + _scheduleElectionWinNotification_inlock(); if (isV1ElectionProtocol()) { - invariant(!_catchupState); - _catchupState = stdx::make_unique<CatchupState>(this); - _catchupState->start_inlock(); + _scanOpTimeForCatchUp_inlock(); } else { - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); } break; } @@ -2792,114 +2786,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( } } -void ReplicationCoordinatorImpl::CatchupState::start_inlock() { - log() << "Entering primary catch-up mode."; - - // No catchup in single node replica set. - if (_repl->_rsConfig.getNumMembers() == 1) { - abort_inlock(); - return; - } - - auto timeoutCB = [this](const CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; - } - log() << "Catchup timed out after becoming primary."; - stdx::lock_guard<stdx::mutex> lk(_repl->_mutex); - abort_inlock(); - }; - - // Schedule timeout callback. - auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod(); - // Deal with infinity and overflow - no timeout. - if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout || - Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) { - return; - } - auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout; - auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB); - if (!status.isOK()) { - log() << "Failed to schedule catchup timeout work."; - abort_inlock(); - return; - } - _timeoutCbh = status.getValue(); -} - -void ReplicationCoordinatorImpl::CatchupState::abort_inlock() { - invariant(_repl->_getMemberState_inlock().primary()); - - log() << "Exited primary catch-up mode."; - // Clean up its own members. - if (_timeoutCbh) { - _repl->_replExecutor->cancel(_timeoutCbh); - } - if (_waiter) { - _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); - } - - // Enter primary drain mode. - _repl->_enterDrainMode_inlock(); - // Destruct the state itself. - _repl->_catchupState.reset(nullptr); -} - -void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() { - auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart(); - // Haven't collected all heartbeat responses. - if (!targetOpTime) { - return; - } - - // We've caught up. - if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { - log() << "Caught up to the latest known optime via heartbeats after becoming primary."; - abort_inlock(); - return; - } - - // Reset the target optime if it has changed. - if (_waiter && _waiter->opTime == *targetOpTime) { - return; - } - - log() << "Heartbeats updated catchup target optime to " << *targetOpTime; - if (_waiter) { - _repl->_opTimeWaiterList.remove_inlock(_waiter.get()); - } - auto targetOpTimeCB = [this, targetOpTime]() { - // Double check the target time since stepdown may signal us too. - if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) { - log() << "Caught up to the latest known optime successfully after becoming primary."; - abort_inlock(); - } - }; - _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB); - _repl->_opTimeWaiterList.add_inlock(_waiter.get()); -} - -Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() { - if (!isV1ElectionProtocol()) { - return Status(ErrorCodes::CommandNotSupported, - "Primary catch-up is only supported by Protocol Version 1"); - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_catchupState) { - _catchupState->abort_inlock(); - return Status::OK(); - } - return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode."); -} - void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { auto scanner = std::make_shared<FreshnessScanner>(); auto scanStartTime = _replExecutor->now(); auto evhStatus = scanner->start( _replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod()); if (evhStatus == ErrorCodes::ShutdownInProgress) { - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } fassertStatusOK(40254, evhStatus.getStatus()); @@ -2908,7 +2801,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() { evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (cbData.status == ErrorCodes::CallbackCanceled) { - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod(); @@ -2937,7 +2830,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Could not access any nodes within timeout when checking for " << "additional ops to apply before finishing transition to primary. " << "Will move forward with becoming primary anyway."; - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } @@ -2946,7 +2839,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) { log() << "My optime is most up-to-date, skipping catch-up " << "and completing transition to primary."; - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); return; } @@ -2959,9 +2852,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca log() << "Finished catch-up oplog after becoming primary."; } - _enterDrainMode_inlock(); + _finishCatchingUpOplog_inlock(); }; - auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB); + auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB); _opTimeWaiterList.add_inlock(waiterInfo.get()); auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) { @@ -2974,7 +2867,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca _replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB); } -void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { +void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() { _applierState = ApplierState::Draining; _externalState->stopProducer(); } @@ -3065,7 +2958,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC } void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { - _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) { + _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) { return _doneWaitingForReplication_inlock( waiter->opTime, SnapshotName::min(), *waiter->writeConcern); }); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 404bf0e16b9..c5f94f32e97 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -333,8 +333,6 @@ public: virtual Status stepUpIfEligible() override; - virtual Status abortCatchupIfNeeded() override; - // ================== Test support API =================== /** @@ -484,55 +482,16 @@ private: kActionStartSingleNodeElection }; - // Abstract struct that holds information about clients waiting for replication. - // Subclasses need to define how to notify them. - struct Waiter { - Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern); - virtual ~Waiter() = default; - - BSONObj toBSON() const; - std::string toString() const; - // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex. - virtual void notify_inlock() = 0; - - const OpTime opTime; - const WriteConcernOptions* writeConcern = nullptr; - }; - - // When ThreadWaiter gets notified, it will signal the conditional variable. - // - // This is used when a thread wants to block inline until the opTime is reached with the given - // writeConcern. - struct ThreadWaiter : public Waiter { - ThreadWaiter(OpTime _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar); - void notify_inlock() override; - - stdx::condition_variable* condVar = nullptr; - }; - - // When the waiter is notified, finishCallback will be called while holding replCoord _mutex - // since WaiterLists are protected by _mutex. - // - // This is used when we want to run a callback when the opTime is reached. - struct CallbackWaiter : public Waiter { - using FinishFunc = stdx::function<void()>; - - CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback); - void notify_inlock() override; - - // The callback that will be called when this waiter is notified. - FinishFunc finishCallback = nullptr; - }; - - class WaiterGuard; + // Struct that holds information about clients waiting for replication. + struct WaiterInfo; + struct WaiterInfoGuard; class WaiterList { public: - using WaiterType = Waiter*; + using WaiterType = WaiterInfo*; - // Adds waiter into the list. + // Adds waiter into the list. Usually, the waiter will be signaled only once and then + // removed. void add_inlock(WaiterType waiter); // Returns whether waiter is found and removed. bool remove_inlock(WaiterType waiter); @@ -569,44 +528,6 @@ private: typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles; - // The state and logic of primary catchup. - // - // When start() is called, CatchupState will schedule the timeout callback. When we get - // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is - // set. - // The primary exits catchup mode when any of the following happens. - // 1) My last applied optime reaches the target optime, if we've received a heartbeat from all - // nodes. - // 2) Catchup timeout expires. - // 3) Primary steps down. - // 4) The primary has to roll back to catch up. - // 5) The primary is too stale to catch up. - // - // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the - // life cycle of the state object aligns with the conceptual state. - // In shutdown, the timeout callback will be canceled by the executor and the state is safe to - // destroy. - // - // Any function of the state must be called while holding _mutex. - class CatchupState { - public: - CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {} - // start() can only be called once. - void start_inlock(); - // Reset the state itself to destruct the state. - void abort_inlock(); - // Heartbeat calls this function to update the target optime. - void signalHeartbeatUpdate_inlock(); - - private: - ReplicationCoordinatorImpl* _repl; // Not owned. - // Callback handle used to cancel a scheduled catchup timeout callback. - ReplicationExecutor::CallbackHandle _timeoutCbh; - // Handle to a Waiter that contains the current target optime to reach after which - // we can exit catchup mode. - std::unique_ptr<CallbackWaiter> _waiter; - }; - /** * Appends a "replicationProgress" section with data for each member in set. */ @@ -1247,7 +1168,7 @@ private: /** * Finish catch-up mode and start drain mode. */ - void _enterDrainMode_inlock(); + void _finishCatchingUpOplog_inlock(); /** * Waits for the config state to leave kConfigStartingUp, which indicates that start() has @@ -1483,10 +1404,6 @@ private: mutable stdx::mutex _indexPrefetchMutex; ReplSettings::IndexPrefetchConfig _indexPrefetchConfig = ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I) - - // The catchup state including all catchup logic. The presence of a non-null pointer indicates - // that the node is currently in catchup mode. - std::unique_ptr<CatchupState> _catchupState; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 7588fb166d5..1075a7e9232 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -160,7 +160,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - simulateCatchUpAbort(); + simulateCatchUpTimeout(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); @@ -223,6 +223,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); + // Wait for catchup check to finish. + simulateCatchUpTimeout(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); @@ -1278,19 +1280,15 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) class PrimaryCatchUpTest : public ReplCoordTest { protected: using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator; - using NetworkRequestFn = stdx::function<void(const NetworkOpIter)>; + using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>; - const Timestamp smallTimestamp{1, 1}; - - executor::RemoteCommandResponse makeHeartbeatResponse(OpTime opTime) { + void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) { ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest(); ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); - return makeResponseStatus(hbResp.toBSON(true)); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true))); } void simulateSuccessfulV1Voting() { @@ -1302,9 +1300,10 @@ protected: log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)"; ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); - // Process requests until we're primary but leave the heartbeats for the notification + bool hasReadyRequests = true; + // Process requests until we're primary and consume the heartbeats for the notification // of election win. Exit immediately on unexpected requests. - while (!replCoord->getMemberState().primary()) { + while (!replCoord->getMemberState().primary() || hasReadyRequests) { log() << "Waiting on network in state " << replCoord->getMemberState(); net->enterNetwork(); if (net->now() < electionTimeoutWhen) { @@ -1315,9 +1314,7 @@ protected: const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { - OpTime opTime(Timestamp(), getReplCoord()->getTerm()); - net->scheduleResponse( - net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime)); + replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { net->scheduleResponse(net->getNextReadyRequest(), @@ -1339,11 +1336,12 @@ protected: // executor. getReplExec()->waitForDBWork_forTest(); net->runReadyNetworkOperations(); + hasReadyRequests = net->hasReadyRequests(); net->exitNetwork(); } } - ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, bool infiniteTimeout = false) { + ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) { BSONObj configObj = BSON("_id" << "mySet" << "version" @@ -1358,8 +1356,7 @@ protected: << "protocolVersion" << 1 << "settings" - << BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis" - << (infiniteTimeout ? -1 : 5000))); + << BSON("catchUpTimeoutMillis" << 5000)); assertStartSuccess(configObj, HostAndPort("node1", 12345)); ReplSetConfig config = assertMakeRSConfig(configObj); @@ -1381,15 +1378,17 @@ protected: return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON()))); } - void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) { + void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) { NetworkInterfaceMock* net = getNet(); net->enterNetwork(); while (net->hasReadyRequests()) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; - if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { - onHeartbeatRequest(noi); + if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") { + onFreshnessScanRequest(noi); + } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { + replyToHeartbeatRequestAsSecondaries(noi); } else { log() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; @@ -1400,8 +1399,7 @@ protected: net->exitNetwork(); } - // Response heartbeats with opTime until the given time. Exit if it sees any other request. - void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) { + void replyHeartbeatsAndRunUntil(Date_t until) { auto net = getNet(); net->enterNetwork(); while (net->now() < until) { @@ -1409,10 +1407,9 @@ protected: // Peek the next request auto noi = net->getFrontOfUnscheduledQueue(); auto& request = noi->getRequest(); - log() << request.target << " at " << net->now() << " processing " << request.cmdObj; if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) { // Consume the next request - onHeartbeatRequest(net->getNextReadyRequest()); + replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest()); } else { // Cannot consume other requests than heartbeats. net->exitNetwork(); @@ -1423,153 +1420,126 @@ protected: } net->exitNetwork(); } - - // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal - // the optime waiter. - void advanceMyLastAppliedOpTime(OpTime opTime) { - getReplCoord()->setMyLastAppliedOpTime(opTime); - getNet()->enterNetwork(); - getNet()->runReadyNetworkOperations(); - getNet()->exitNetwork(); - } }; -// The first round of heartbeats indicates we are the most up-to-date. -TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) { +TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - int count = 0; - processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) { - count++; - auto net = getNet(); - // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1)); + processFreshnessScanRequests([this](const NetworkOpIter noi) { + getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); }); - - // Get 2 heartbeats from secondaries. - ASSERT_EQUALS(2, count); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -// Heartbeats set a future target OpTime and we reached that successfully. -TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { +TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); - OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - processHeartbeatRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); + + processFreshnessScanRequests([this](const NetworkOpIter noi) { + auto request = noi->getRequest(); + log() << "Black holing request to " << request.target << ": " << request.cmdObj; + getNet()->blackHole(noi); }); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - advanceMyLastAppliedOpTime(time2); + + auto net = getNet(); + replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod()); + ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); + ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, CatchupTimeout) { +TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); - replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) { - // Other nodes are ahead of me. - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); - }); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); - stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out")); - auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); - Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); - ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); -} -TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) { - startCapturingLogMessages(); - - OpTime time1(Timestamp(100, 1), 0); - ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - // We should get caught up by the timeout time. - auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); - replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) { - const RemoteCommandRequest& request = noi->getRequest(); - if (request.target.host() == "node2") { - auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host"); - getNet()->scheduleResponse(noi, getNet()->now(), status); - } else { - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); - } + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); }); + + NetworkInterfaceMock* net = getNet(); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + // Simulate the work done by bgsync and applier threads. + // setMyLastAppliedOpTime() will signal the optime waiter. + getReplCoord()->setMyLastAppliedOpTime(time2); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) { +TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); + OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - // We should get caught up by the timeout time. - auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod(); - replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) { - const RemoteCommandRequest& request = noi->getRequest(); - if (request.target.host() == "node2") { - log() << "Black holing heartbeat from " << request.target.host(); - getNet()->blackHole(noi); - } else { - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); - } + + // The new primary learns of the latest OpTime. + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); }); + + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats")); + ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); auto opCtx = makeOperationContext(); getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) { +TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) { startCapturingLogMessages(); OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - // Step down immediately. + + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto request = noi->getRequest(); + log() << "Black holing request to " << request.target << ": " << request.cmdObj; + getNet()->blackHole(noi); + }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); - ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); - ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); + ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary")); auto opCtx = makeOperationContext(); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); @@ -1581,25 +1551,30 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { OpTime time1(Timestamp(100, 1), 0); OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - // Step down in the middle of catchup. - auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2; - replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) { - // Other nodes are ahead of me. - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); + + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { + auto net = getNet(); + // The old primary accepted one more op and all nodes caught up after voting for me. + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); }); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); + auto net = getNet(); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + auto opCtx = makeOperationContext(); + // Simulate the applier signaling replCoord to exit drain mode. + // At this point, we see the stepdown and reset the states. + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); - ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); - ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); - auto opCtx = makeOperationContext(); + ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary")); Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -1611,17 +1586,24 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - processHeartbeatRequests([this, time2](const NetworkOpIter noi) { + processFreshnessScanRequests([this, time2](const NetworkOpIter noi) { auto net = getNet(); // The old primary accepted one more op and all nodes caught up after voting for me. - net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); + net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2)); }); + + NetworkInterfaceMock* net = getNet(); ReplicationCoordinatorImpl* replCoord = getReplCoord(); ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - advanceMyLastAppliedOpTime(time2); + // Simulate the work done by bgsync and applier threads. + // setMyLastAppliedOpTime() will signal the optime waiter. + replCoord->setMyLastAppliedOpTime(time2); + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); ASSERT(replCoord->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime")); + ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary.")); // Step down during drain mode. TopologyCoordinator::UpdateTermResult updateTermResult; @@ -1635,10 +1617,9 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { simulateSuccessfulV1Voting(); ASSERT_TRUE(replCoord->getMemberState().primary()); - // No need to catch up, so we enter drain mode. - processHeartbeatRequests([this, time2](const NetworkOpIter noi) { - auto net = getNet(); - net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); + // No need to catch-up, so we enter drain mode. + processFreshnessScanRequests([this](const NetworkOpIter noi) { + getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime())); }); ASSERT(replCoord->getApplierState() == ApplierState::Draining); auto opCtx = makeOperationContext(); @@ -1652,113 +1633,6 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); } -TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { - OpTime time1(Timestamp(100, 1), 0); - OpTime time2(Timestamp(200, 1), 0); - OpTime time3(Timestamp(300, 1), 0); - OpTime time4(Timestamp(400, 1), 0); - - // 1) The primary is at time 1 at the beginning. - ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); - - // 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available. - // So the target optime is time 3. - startCapturingLogMessages(); - auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3; - replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) { - const RemoteCommandRequest& request = noi->getRequest(); - if (request.target.host() == "node2") { - auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host"); - getNet()->scheduleResponse(noi, getNet()->now(), status); - } else { - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3)); - } - }); - // The node is still in catchup mode, but the target optime has been set. - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - stopCapturingLogMessages(); - ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); - - // 3) Advancing its applied optime to time 2 isn't enough. - advanceMyLastAppliedOpTime(time2); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - - // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target. - startCapturingLogMessages(); - auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3; - replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) { - const RemoteCommandRequest& request = noi->getRequest(); - if (request.target.host() == "node2") { - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4)); - } else { - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3)); - } - }); - // The node is still in catchup mode, but the target optime has been updated. - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - stopCapturingLogMessages(); - ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); - - // 5) Advancing to time 3 isn't enough now. - advanceMyLastAppliedOpTime(time3); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - - // 6) The node catches up time 4 eventually. - startCapturingLogMessages(); - advanceMyLastAppliedOpTime(time4); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); - stopCapturingLogMessages(); - ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime")); - auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); - Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); - ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); -} - -TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) { - startCapturingLogMessages(); - - OpTime time1(Timestamp(100, 1), 0); - OpTime time2(Timestamp(100, 2), 0); - ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, true); - - // Run time far forward and ensure we are still in catchup mode. - // This is an arbitrary time 'far' into the future. - auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10; - replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) { - // Other nodes are ahead of me. - getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); - - // Simulate the heartbeats from secondaries to primary to update liveness info. - // TODO(sz): Remove this after merging liveness info and heartbeats. - const RemoteCommandRequest& request = noi->getRequest(); - ReplSetHeartbeatArgsV1 hbArgs; - hbArgs.setConfigVersion(config.getConfigVersion()); - hbArgs.setSetName(config.getReplSetName()); - hbArgs.setSenderHost(request.target); - hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId()); - hbArgs.setTerm(getReplCoord()->getTerm()); - ASSERT(hbArgs.isInitialized()); - ReplSetHeartbeatResponse response; - ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); - }); - ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); - - // Simulate a user initiated abort. - ASSERT_OK(getReplCoord()->abortCatchupIfNeeded()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); - - stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); - ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime")); - ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); - auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); - Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1); - ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); -} - } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 68e1a61c4ac..0495e843590 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -234,11 +234,6 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down. _signalStepDownWaiter_inlock(); - // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing. - if (_catchupState) { - _catchupState->signalHeartbeatUpdate_inlock(); - } - _scheduleHeartbeatToTarget_inlock( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); @@ -670,9 +665,6 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() { } _scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); } - - _topCoord->restartHeartbeats(); - if (isV1ElectionProtocol()) { for (auto&& slaveInfo : _slaveInfo) { slaveInfo.lastUpdate = _replExecutor->now(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 1f3b0881d5c..e76c37898a3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -107,6 +107,11 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext opCtx, replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); replCoord->waitForElectionFinish_forTest(); + // Wait for primary catch-up + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); @@ -5052,6 +5057,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { // Single node cluster - this node should start election on setFollowerMode() completion. replCoord->waitForElectionFinish_forTest(); + simulateCatchUpTimeout(); // Successful dry run election increases term. ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 18397d23a23..27d62c0af1e 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -497,9 +497,5 @@ void ReplicationCoordinatorMock::setMaster(bool isMaster) { _settings.setMaster(isMaster); } -Status ReplicationCoordinatorMock::abortCatchupIfNeeded() { - return Status::OK(); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 72bc120f226..fdfa491f0ad 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -284,8 +284,6 @@ public: return _service; } - virtual Status abortCatchupIfNeeded() override; - private: AtomicUInt64 _snapshotNameGenerator; ServiceContext* const _service; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index aaa00fcdae4..e5a17dd5c1a 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -323,10 +323,6 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); - // The smallest valid optime in PV1. - OpTime opTime(Timestamp(), 0); - hbResp.setAppliedOpTime(opTime); - hbResp.setDurableOpTime(opTime); hbResp.setConfigVersion(rsConfig.getConfigVersion()); net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true))); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { @@ -492,30 +488,32 @@ void ReplCoordTest::disableSnapshots() { _externalState->setAreSnapshotsEnabled(false); } -void ReplCoordTest::simulateCatchUpAbort() { +void ReplCoordTest::simulateCatchUpTimeout() { NetworkInterfaceMock* net = getNet(); - auto heartbeatTimeoutWhen = - net->now() + getReplCoord()->getConfig().getHeartbeatTimeoutPeriodMillis(); + auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod(); bool hasRequest = false; net->enterNetwork(); - if (net->now() < heartbeatTimeoutWhen) { - net->runUntil(heartbeatTimeoutWhen); + if (net->now() < catchUpTimeoutWhen) { + net->runUntil(catchUpTimeoutWhen); } hasRequest = net->hasReadyRequests(); + net->exitNetwork(); + while (hasRequest) { + net->enterNetwork(); auto noi = net->getNextReadyRequest(); auto request = noi->getRequest(); // Black hole heartbeat requests caused by time advance. log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj; net->blackHole(noi); - if (net->now() < heartbeatTimeoutWhen) { - net->runUntil(heartbeatTimeoutWhen); + if (net->now() < catchUpTimeoutWhen) { + net->runUntil(catchUpTimeoutWhen); } else { net->runReadyNetworkOperations(); } hasRequest = net->hasReadyRequests(); + net->exitNetwork(); } - net->exitNetwork(); } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index ab2653be11d..972e2f503ae 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -253,9 +253,9 @@ protected: void disableSnapshots(); /** - * Timeout all heartbeat requests for primary catch-up. + * Timeout all freshness scan request for primary catch-up. */ - void simulateCatchUpAbort(); + void simulateCatchUpTimeout(); private: std::unique_ptr<ReplicationCoordinatorImpl> _repl; diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 37aad7459fa..2a2fc1259fb 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -499,21 +499,6 @@ public: */ virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0; - /** - * Reset the booleans to record the last heartbeat restart. - */ - virtual void restartHeartbeats() = 0; - - /** - * Scans through all members that are 'up' and return the latest known optime, if we have - * received (successful or failed) heartbeats from all nodes since heartbeat restart. - * - * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted - * heartbeats. - * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down. - */ - virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const = 0; - protected: TopologyCoordinator() {} }; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 4211dee4860..1c1f53f89a1 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2648,35 +2648,5 @@ void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool support supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } -void TopologyCoordinatorImpl::restartHeartbeats() { - for (auto& hb : _hbdata) { - hb.restart(); - } -} - -boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const { - // The smallest OpTime in PV1. - OpTime latest(Timestamp(0, 0), 0); - for (size_t i = 0; i < _hbdata.size(); i++) { - auto& peer = _hbdata[i]; - - if (static_cast<int>(i) == _selfIndex) { - continue; - } - // If any heartbeat is not fresh enough, return none. - if (!peer.isUpdatedSinceRestart()) { - return boost::none; - } - // Ignore down members - if (!peer.up()) { - continue; - } - if (peer.getAppliedOpTime() > latest) { - latest = peer.getAppliedOpTime(); - } - } - return latest; -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 0c268b67347..a52758ab8ed 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -246,10 +246,6 @@ public: bool isPriorityTakeover); virtual void setStorageEngineSupportsReadCommitted(bool supported); - virtual void restartHeartbeats(); - - virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; - //////////////////////////////////////////////////////////// // // Test support methods diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index e6865a2e649..fa3834dd03a 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -681,8 +681,7 @@ var ReplSetTest = function(opts) { return config; } - // Check journaling by sending commands through the bridge if it's used. - if (_isRunningWithoutJournaling(this.nodes[0])) { + if (_isRunningWithoutJournaling(replNode)) { config[wcMajorityJournalField] = false; } @@ -874,11 +873,9 @@ var ReplSetTest = function(opts) { }; this.reInitiate = function() { - var config = this.getReplSetConfigFromNode(); - var newConfig = this.getReplSetConfig(); - // Only reset members. - config.members = newConfig.members; - config.version += 1; + var config = this.getReplSetConfig(); + var newVersion = this.getReplSetConfigFromNode().version + 1; + config.version = newVersion; this._setDefaultConfigOptions(config); |