summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Guo <robert.guo@10gen.com>2017-04-20 10:58:37 -0400
committerRobert Guo <robert.guo@10gen.com>2017-04-20 10:58:57 -0400
commitc08590a6ac9dc54c9d910822d47ea17140b56f89 (patch)
tree9a6986057f4453f858fac87d43b7435e56f5e807
parentfac33fe5a6814169c9c6131d80f1b325c74647da (diff)
downloadmongo-c08590a6ac9dc54c9d910822d47ea17140b56f89.tar.gz
Revert "SERVER-26848 Exit catchup mode when not syncing more data."
This reverts commit d0c851e2f4bfea514e22c97af1838640d2849a8c.
-rw-r--r--jstests/multiVersion/downgrade_replset.js65
-rw-r--r--jstests/multiVersion/initialsync.js9
-rw-r--r--jstests/replsets/catchup.js187
-rw-r--r--jstests/replsets/rslib.js1
-rw-r--r--src/mongo/db/repl/bgsync.cpp8
-rw-r--r--src/mongo/db/repl/member_heartbeat_data.cpp4
-rw-r--r--src/mongo/db/repl/member_heartbeat_data.h14
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp35
-rw-r--r--src/mongo/db/repl/repl_set_config.cpp9
-rw-r--r--src/mongo/db/repl/repl_set_config.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp283
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h97
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp326
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp22
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h4
-rw-r--r--src/mongo/db/repl/topology_coordinator.h15
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h4
-rw-r--r--src/mongo/shell/replsettest.js11
24 files changed, 330 insertions, 820 deletions
diff --git a/jstests/multiVersion/downgrade_replset.js b/jstests/multiVersion/downgrade_replset.js
index 022471410a1..658b35813a2 100644
--- a/jstests/multiVersion/downgrade_replset.js
+++ b/jstests/multiVersion/downgrade_replset.js
@@ -14,45 +14,38 @@ var nodes = {
n3: {binVersion: newVersion}
};
-function runDowngradeTest(protocolVersion) {
- var rst = new ReplSetTest({name: name, nodes: nodes});
- rst.startSet();
- var replSetConfig = rst.getReplSetConfig();
- replSetConfig.protocolVersion = protocolVersion;
- // Hard-code catchup timeout to be compatible with 3.4
- replSetConfig.settings = {catchUpTimeoutMillis: 2000};
- rst.initiate(replSetConfig);
-
- var primary = rst.getPrimary();
- var coll = "test.foo";
-
- jsTest.log("Inserting documents into collection.");
- for (var i = 0; i < 10; i++) {
- primary.getCollection(coll).insert({_id: i, str: "hello world"});
- }
+var rst = new ReplSetTest({name: name, nodes: nodes});
+rst.startSet();
+var replSetConfig = rst.getReplSetConfig();
+replSetConfig.protocolVersion = 0;
+rst.initiate(replSetConfig);
+
+var primary = rst.getPrimary();
+var coll = "test.foo";
+
+jsTest.log("Inserting documents into collection.");
+for (var i = 0; i < 10; i++) {
+ primary.getCollection(coll).insert({_id: i, str: "hello world"});
+}
- function insertDocuments(rsURL, coll) {
- var coll = new Mongo(rsURL).getCollection(coll);
- var count = 10;
- while (!isFinished()) {
- assert.writeOK(coll.insert({_id: count, str: "hello world"}));
- count++;
- }
+function insertDocuments(rsURL, coll) {
+ var coll = new Mongo(rsURL).getCollection(coll);
+ var count = 10;
+ while (!isFinished()) {
+ assert.writeOK(coll.insert({_id: count, str: "hello world"}));
+ count++;
}
+}
- jsTest.log("Starting parallel operations during downgrade..");
- var joinFindInsert = startParallelOps(primary, insertDocuments, [rst.getURL(), coll]);
-
- jsTest.log("Downgrading replica set..");
- rst.upgradeSet({binVersion: oldVersion});
- jsTest.log("Downgrade complete.");
+jsTest.log("Starting parallel operations during downgrade..");
+var joinFindInsert = startParallelOps(primary, insertDocuments, [rst.getURL(), coll]);
- primary = rst.getPrimary();
- printjson(rst.status());
+jsTest.log("Downgrading replica set..");
+rst.upgradeSet({binVersion: oldVersion});
+jsTest.log("Downgrade complete.");
- joinFindInsert();
- rst.stopSet();
-}
+primary = rst.getPrimary();
+printjson(rst.status());
-runDowngradeTest(0);
-runDowngradeTest(1);
+joinFindInsert();
+rst.stopSet();
diff --git a/jstests/multiVersion/initialsync.js b/jstests/multiVersion/initialsync.js
index a36d538a6f8..d8c1d629fd0 100644
--- a/jstests/multiVersion/initialsync.js
+++ b/jstests/multiVersion/initialsync.js
@@ -7,15 +7,13 @@ var newVersion = "latest";
var name = "multiversioninitsync";
-var multitest = function(replSetVersion, newNodeVersion, configSettings) {
+var multitest = function(replSetVersion, newNodeVersion) {
var nodes = {n1: {binVersion: replSetVersion}, n2: {binVersion: replSetVersion}};
print("Start up a two-node " + replSetVersion + " replica set.");
var rst = new ReplSetTest({name: name, nodes: nodes});
rst.startSet();
- var conf = rst.getReplSetConfig();
- conf.settings = configSettings;
- rst.initiate(conf);
+ rst.initiate();
// Wait for a primary node.
var primary = rst.getPrimary();
@@ -52,5 +50,4 @@ multitest(oldVersion, newVersion);
// Old Secondary is synced from a "latest"
// version ReplSet.
// *****************************************
-// Hard-code catchup timeout. The default timeout on 3.5 is -1, which is invalid on 3.4.
-multitest(newVersion, oldVersion, {catchUpTimeoutMillis: 2000});
+multitest(newVersion, oldVersion);
diff --git a/jstests/replsets/catchup.js b/jstests/replsets/catchup.js
index 51632379463..542ad51c723 100644
--- a/jstests/replsets/catchup.js
+++ b/jstests/replsets/catchup.js
@@ -12,7 +12,6 @@
rst.startSet();
var conf = rst.getReplSetConfig();
- conf.members[2].priority = 0;
conf.settings = {
heartbeatIntervalMillis: 500,
electionTimeoutMillis: 10000,
@@ -35,7 +34,7 @@
node.adminCommand(verbosity);
});
- function stepUpNode(node) {
+ function stepUp(node) {
assert.soon(function() {
node.adminCommand({replSetStepUp: 1});
return node.adminCommand('replSetGetStatus').myState == ReplSetTest.State.PRIMARY;
@@ -44,6 +43,12 @@
return node;
}
+ function doWrites(node) {
+ for (var i = 0; i < 3; i++) {
+ assert.writeOK(node.getDB("test").foo.insert({x: i}));
+ }
+ }
+
function checkOpInOplog(node, op, count) {
node.getDB("admin").getMongo().setSlaveOk();
var oplog = node.getDB("local")['oplog.rs'];
@@ -51,148 +56,98 @@
assert.eq(oplog.count(op), count, "op: " + tojson(op) + ", oplog: " + tojson(oplogArray));
}
- // Stop replication on secondaries, do writes and step up one of the secondaries.
- //
- // The old primary has extra writes that are not replicated to the other nodes yet,
- // but the new primary steps up, getting the vote from the the third node "voter".
- function stopRelicationAndEnforceNewPrimaryToCatchUp() {
- // Write documents that cannot be replicated to secondaries in time.
- var oldSecondaries = rst.getSecondaries();
- var oldPrimary = rst.getPrimary();
- stopServerReplication(oldSecondaries);
- for (var i = 0; i < 3; i++) {
- assert.writeOK(oldPrimary.getDB("test").foo.insert({x: i}));
+ function isEarlierTimestamp(ts1, ts2) {
+ if (ts1.getTime() == ts2.getTime()) {
+ return ts1.getInc() < ts2.getInc();
}
- var latestOpOnOldPrimary = getLatestOp(oldPrimary);
- // New primary wins immediately, but needs to catch up.
- var newPrimary = stepUpNode(oldSecondaries[0]);
- rst.awaitNodesAgreeOnPrimary();
- var latestOpOnNewPrimary = getLatestOp(newPrimary);
- // Check this node is not writable.
- assert.eq(newPrimary.getDB("test").isMaster().ismaster, false);
-
- return {
- oldSecondaries: oldSecondaries,
- oldPrimary: oldPrimary,
- newPrimary: newPrimary,
- voter: oldSecondaries[1],
- latestOpOnOldPrimary: latestOpOnOldPrimary,
- latestOpOnNewPrimary: latestOpOnNewPrimary
- };
- }
-
- function reconfigCatchUpTimeoutMillis(timeout) {
- // Reconnect all nodes to make sure reconfig succeeds.
- rst.nodes.forEach(reconnect);
- // Reconfigure replicaset to decrease catchup timeout
- conf = rst.getReplSetConfigFromNode();
- conf.version++;
- conf.settings.catchUpTimeoutMillis = timeout;
- reconfig(rst, conf);
- rst.awaitReplication();
- rst.awaitNodesAgreeOnPrimary();
+ return ts1.getTime() < ts2.getTime();
}
- rst.awaitReplication();
+ rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
- jsTest.log("Case 1: The primary is up-to-date after refreshing heartbeats.");
+ jsTest.log("Case 1: The primary is up-to-date after freshness scan.");
// Should complete transition to primary immediately.
- var newPrimary = stepUpNode(rst.getSecondary());
+ var newPrimary = stepUp(rst.getSecondary());
rst.awaitNodesAgreeOnPrimary();
// Should win an election and finish the transition very quickly.
assert.eq(newPrimary, rst.getPrimary());
- rst.awaitReplication();
+ rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
jsTest.log("Case 2: The primary needs to catch up, succeeds in time.");
- var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
-
+ // Write documents that cannot be replicated to secondaries in time.
+ var originalSecondaries = rst.getSecondaries();
+ stopServerReplication(originalSecondaries);
+ doWrites(rst.getPrimary());
+ var latestOp = getLatestOp(rst.getPrimary());
+ // New primary wins immediately, but needs to catch up.
+ newPrimary = stepUp(rst.getSecondary());
+ rst.awaitNodesAgreeOnPrimary();
+ // Check this node is not writable.
+ assert.eq(newPrimary.getDB("test").isMaster().ismaster, false);
// Disable fail point to allow replication.
- restartServerReplication(stepUpResults.oldSecondaries);
+ restartServerReplication(originalSecondaries);
// getPrimary() blocks until the primary finishes drain mode.
- assert.eq(stepUpResults.newPrimary, rst.getPrimary());
+ assert.eq(newPrimary, rst.getPrimary());
// Wait for all secondaries to catch up
rst.awaitReplication();
// Check the latest op on old primary is preserved on the new one.
- checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1);
- rst.awaitReplication();
+ checkOpInOplog(newPrimary, latestOp, 1);
+ rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
jsTest.log("Case 3: The primary needs to catch up, but has to change sync source to catch up.");
- stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
-
- // Disable fail point on the voter. Wait until it catches up with the old primary.
- restartServerReplication(stepUpResults.voter);
- assert.commandWorked(
- stepUpResults.voter.adminCommand({replSetSyncFrom: stepUpResults.oldPrimary.host}));
- awaitOpTime(stepUpResults.voter, stepUpResults.latestOpOnOldPrimary.ts);
+ // Write documents that cannot be replicated to secondaries in time.
+ stopServerReplication(rst.getSecondaries());
+ doWrites(rst.getPrimary());
+ var oldPrimary = rst.getPrimary();
+ originalSecondaries = rst.getSecondaries();
+ latestOp = getLatestOp(oldPrimary);
+ newPrimary = stepUp(originalSecondaries[0]);
+ rst.awaitNodesAgreeOnPrimary();
+ // Disable fail point on one of the other secondaries.
+ // Wait until it catches up with the old primary.
+ restartServerReplication(originalSecondaries[1]);
+ assert.commandWorked(originalSecondaries[1].adminCommand({replSetSyncFrom: oldPrimary.host}));
+ awaitOpTime(originalSecondaries[1], latestOp.ts);
// Disconnect the new primary and the old one.
- stepUpResults.oldPrimary.disconnect(stepUpResults.newPrimary);
+ oldPrimary.disconnect(newPrimary);
// Disable the failpoint, the new primary should sync from the other secondary.
- restartServerReplication(stepUpResults.newPrimary);
- assert.eq(stepUpResults.newPrimary, rst.getPrimary());
- checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1);
+ restartServerReplication(newPrimary);
+ assert.eq(newPrimary, rst.getPrimary());
+ checkOpInOplog(newPrimary, latestOp, 1);
// Restore the broken connection
- stepUpResults.oldPrimary.reconnect(stepUpResults.newPrimary);
- rst.awaitReplication();
+ oldPrimary.reconnect(newPrimary);
+ rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
jsTest.log("Case 4: The primary needs to catch up, fails due to timeout.");
- reconfigCatchUpTimeoutMillis(10 * 1000);
-
- stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
- // Wait until the new primary completes the transition to primary and writes a no-op.
- checkLog.contains(stepUpResults.newPrimary, "Catchup timed out after becoming primary");
- restartServerReplication(stepUpResults.newPrimary);
- assert.eq(stepUpResults.newPrimary, rst.getPrimary());
-
- // Wait for the no-op "new primary" after winning an election, so that we know it has
- // finished transition to primary.
- assert.soon(function() {
- return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary,
- getLatestOp(stepUpResults.newPrimary)) < 0;
- });
- // The extra oplog entries on the old primary are not replicated to the new one.
- checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0);
- restartServerReplication(stepUpResults.voter);
- rst.awaitReplication();
+ // Reconfigure replicaset to decrease catchup timeout
+ conf = rst.getReplSetConfigFromNode();
+ conf.version++;
+ conf.settings.catchUpTimeoutMillis = 10 * 1000;
+ reconfig(rst, conf);
+ rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
+ rst.awaitNodesAgreeOnPrimary();
- jsTest.log("Case 5: The primary needs to catch up with no timeout, then gets aborted.");
- reconfigCatchUpTimeoutMillis(-1);
- stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
+ // Write documents that cannot be replicated to secondaries in time.
+ originalSecondaries = rst.getSecondaries();
+ stopServerReplication(originalSecondaries);
+ doWrites(rst.getPrimary());
+ latestOp = getLatestOp(rst.getPrimary());
- // Abort catchup.
- assert.commandWorked(stepUpResults.newPrimary.adminCommand({replSetAbortPrimaryCatchUp: 1}));
+ // New primary wins immediately, but needs to catch up.
+ newPrimary = stepUp(originalSecondaries[0]);
+ rst.awaitNodesAgreeOnPrimary();
+ var latestOpOnNewPrimary = getLatestOp(newPrimary);
+ // Wait until the new primary completes the transition to primary and writes a no-op.
+ checkLog.contains(newPrimary, "Cannot catch up oplog after becoming primary");
+ restartServerReplication(newPrimary);
+ assert.eq(newPrimary, rst.getPrimary());
// Wait for the no-op "new primary" after winning an election, so that we know it has
// finished transition to primary.
assert.soon(function() {
- return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary,
- getLatestOp(stepUpResults.newPrimary)) < 0;
+ return isEarlierTimestamp(latestOpOnNewPrimary.ts, getLatestOp(newPrimary).ts);
});
// The extra oplog entries on the old primary are not replicated to the new one.
- checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0);
- restartServerReplication(stepUpResults.oldSecondaries);
- rst.awaitReplication();
- checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0);
-
- // TODO: Uncomment case 6 when SERVER-28751 gets fixed.
- //
- // jsTest.log("Case 6: The primary needs to catch up with no timeout, but steps down.");
- // var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
-
- // // Step-down command should abort catchup.
- // try {
- // printjson(stepUpResults.newPrimary.adminCommand({replSetStepDown: 60}));
- // } catch (e) {
- // print(e);
- // }
- // // Rename the primary.
- // var steppedDownPrimary = stepUpResults.newPrimary;
- // var newPrimary = rst.getPrimary();
- // assert.neq(newPrimary, steppedDownPrimary);
-
- // // Enable data replication on the stepped down primary and make sure it syncs old writes.
- // rst.nodes.forEach(reconnect);
- // restartServerReplication(stepUpResults.oldSecondaries);
- // rst.awaitReplication();
- // checkOpInOplog(steppedDownPrimary, stepUpResults.latestOpOnOldPrimary, 1);
-
+ checkOpInOplog(newPrimary, latestOp, 0);
+ restartServerReplication(originalSecondaries[1]);
})();
diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js
index 5911723d717..1471824bd8f 100644
--- a/jstests/replsets/rslib.js
+++ b/jstests/replsets/rslib.js
@@ -162,7 +162,6 @@ var getLastOpTime;
if (!isNetworkError(e)) {
throw e;
}
- print("Calling replSetReconfig failed. " + tojson(e));
}
var master = rs.getPrimary().getDB("admin");
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 9d129284de0..df792fa6fdc 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -313,12 +313,13 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
+ // TODO: End catchup mode early if we are too stale.
if (_replCoord->getMemberState().primary()) {
warning() << "Too stale to catch up.";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen
<< " from " << syncSourceResp.getSyncSource();
- _replCoord->abortCatchupIfNeeded();
+ sleepsecs(1);
return;
}
@@ -567,8 +568,9 @@ void BackgroundSync::_runRollback(OperationContext* opCtx,
int requiredRBID,
StorageInterface* storageInterface) {
if (_replCoord->getMemberState().primary()) {
- warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode.";
- _replCoord->abortCatchupIfNeeded();
+ // TODO: Abort catchup mode early if rollback detected.
+ warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
+ sleepsecs(1);
return;
}
diff --git a/src/mongo/db/repl/member_heartbeat_data.cpp b/src/mongo/db/repl/member_heartbeat_data.cpp
index 1b9b9ea3f13..c267a6ba8ed 100644
--- a/src/mongo/db/repl/member_heartbeat_data.cpp
+++ b/src/mongo/db/repl/member_heartbeat_data.cpp
@@ -54,8 +54,6 @@ void MemberHeartbeatData::setUpValues(Date_t now,
}
_authIssue = false;
_lastHeartbeat = now;
- _updatedSinceRestart = true;
-
if (!hbResponse.hasState()) {
hbResponse.setState(MemberState::RS_UNKNOWN);
}
@@ -79,7 +77,6 @@ void MemberHeartbeatData::setDownValues(Date_t now, const std::string& heartbeat
_upSince = Date_t();
_lastHeartbeat = now;
_authIssue = false;
- _updatedSinceRestart = true;
_lastResponse = ReplSetHeartbeatResponse();
_lastResponse.setState(MemberState::RS_DOWN);
@@ -94,7 +91,6 @@ void MemberHeartbeatData::setAuthIssue(Date_t now) {
_upSince = Date_t();
_lastHeartbeat = now;
_authIssue = true;
- _updatedSinceRestart = true;
_lastResponse = ReplSetHeartbeatResponse();
_lastResponse.setState(MemberState::RS_UNKNOWN);
diff --git a/src/mongo/db/repl/member_heartbeat_data.h b/src/mongo/db/repl/member_heartbeat_data.h
index f67a0a87757..0f5dacd9081 100644
--- a/src/mongo/db/repl/member_heartbeat_data.h
+++ b/src/mongo/db/repl/member_heartbeat_data.h
@@ -123,17 +123,6 @@ public:
*/
void setAuthIssue(Date_t now);
- /**
- * Reset the boolean to record the last restart.
- */
- void restart() {
- _updatedSinceRestart = false;
- }
-
- bool isUpdatedSinceRestart() const {
- return _updatedSinceRestart;
- }
-
private:
// -1 = not checked yet, 0 = member is down/unreachable, 1 = member is up
int _health;
@@ -150,9 +139,6 @@ private:
// The last heartbeat response we received.
ReplSetHeartbeatResponse _lastResponse;
-
- // Have we received heartbeats since the last restart?
- bool _updatedSinceRestart = false;
};
} // namespace repl
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index 09226ba0237..6348f72bfbd 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -868,7 +868,7 @@ public:
status = getGlobalReplicationCoordinator()->stepUpIfEligible();
if (!status.isOK()) {
- log() << "replSetStepUp request failed" << causedBy(status);
+ log() << "replSetStepUp request failed " << causedBy(status);
}
return appendCommandStatus(result, status);
@@ -880,38 +880,5 @@ private:
}
} cmdReplSetStepUp;
-class CmdReplSetAbortPrimaryCatchUp : public ReplSetCommand {
-public:
- virtual void help(stringstream& help) const {
- help << "{ CmdReplSetAbortPrimaryCatchUp : 1 }\n";
- help << "Abort primary catch-up mode; immediately finish the transition to primary "
- "without fetching any further unreplicated writes from any other online nodes";
- }
-
- CmdReplSetAbortPrimaryCatchUp() : ReplSetCommand("replSetAbortPrimaryCatchUp") {}
-
- virtual bool run(OperationContext* opCtx,
- const string&,
- BSONObj& cmdObj,
- string& errmsg,
- BSONObjBuilder& result) override {
- Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result);
- if (!status.isOK())
- return appendCommandStatus(result, status);
- log() << "Received replSetAbortPrimaryCatchUp request";
-
- status = getGlobalReplicationCoordinator()->abortCatchupIfNeeded();
- if (!status.isOK()) {
- log() << "replSetAbortPrimaryCatchUp request failed" << causedBy(status);
- }
- return appendCommandStatus(result, status);
- }
-
-private:
- ActionSet getAuthActionSet() const override {
- return ActionSet{ActionType::replSetStateChange};
- }
-} cmdReplSetAbortPrimaryCatchUp;
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp
index 2fbbd7cad52..f7140cc56bf 100644
--- a/src/mongo/db/repl/repl_set_config.cpp
+++ b/src/mongo/db/repl/repl_set_config.cpp
@@ -44,7 +44,6 @@ namespace repl {
const size_t ReplSetConfig::kMaxMembers;
const size_t ReplSetConfig::kMaxVotingMembers;
-const Milliseconds ReplSetConfig::kInfiniteCatchUpTimeout(-1);
const std::string ReplSetConfig::kConfigServerFieldName = "configsvr";
const std::string ReplSetConfig::kVersionFieldName = "version";
@@ -52,7 +51,7 @@ const std::string ReplSetConfig::kMajorityWriteConcernModeName = "$majority";
const Milliseconds ReplSetConfig::kDefaultHeartbeatInterval(2000);
const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10);
const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000);
-const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(kInfiniteCatchUpTimeout);
+const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(2000);
const bool ReplSetConfig::kDefaultChainingAllowed(true);
namespace {
@@ -271,14 +270,14 @@ Status ReplSetConfig::_parseSettingsSubdocument(const BSONObj& settings) {
//
// Parse catchUpTimeoutMillis
//
- auto validCatchUpTimeout = [](long long timeout) { return timeout >= 0LL || timeout == -1LL; };
+ auto notLessThanZero = stdx::bind(std::greater_equal<long long>(), stdx::placeholders::_1, 0);
long long catchUpTimeoutMillis;
Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf(
settings,
kCatchUpTimeoutFieldName,
durationCount<Milliseconds>(kDefaultCatchUpTimeoutPeriod),
- validCatchUpTimeout,
- "catch-up timeout must be positive, 0 (no catch-up) or -1 (infinite catch-up).",
+ notLessThanZero,
+ "catch-up timeout must be greater than or equal to 0",
&catchUpTimeoutMillis);
if (!catchUpTimeoutStatus.isOK()) {
return catchUpTimeoutStatus;
diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h
index 63d5bcb80e8..e44e51b12a7 100644
--- a/src/mongo/db/repl/repl_set_config.h
+++ b/src/mongo/db/repl/repl_set_config.h
@@ -59,7 +59,6 @@ public:
static const size_t kMaxMembers = 50;
static const size_t kMaxVotingMembers = 7;
- static const Milliseconds kInfiniteCatchUpTimeout;
static const Milliseconds kDefaultElectionTimeoutPeriod;
static const Milliseconds kDefaultHeartbeatInterval;
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index c53f5a7b655..8446e718d30 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -880,11 +880,6 @@ public:
virtual ServiceContext* getServiceContext() = 0;
- /**
- * Abort catchup if the node is in catchup mode.
- */
- virtual Status abortCatchupIfNeeded() = 0;
-
protected:
ReplicationCoordinator();
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 301f2c92c7c..f42e712ffd3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -155,45 +155,54 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const {
return toBSON().toString();
}
-ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
- : opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
+struct ReplicationCoordinatorImpl::WaiterInfo {
-BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const {
- BSONObjBuilder bob;
- bob.append("opTime", opTime.toBSON());
- if (writeConcern) {
- bob.append("writeConcern", writeConcern->toBSON());
- }
- return bob.obj();
-};
-
-std::string ReplicationCoordinatorImpl::Waiter::toString() const {
- return toBSON().toString();
-};
+ using FinishFunc = stdx::function<void()>;
+ WaiterInfo(unsigned int _opID,
+ const OpTime _opTime,
+ const WriteConcernOptions* _writeConcern,
+ stdx::condition_variable* _condVar)
+ : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {}
-ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
- const WriteConcernOptions* _writeConcern,
- stdx::condition_variable* _condVar)
- : Waiter(_opTime, _writeConcern), condVar(_condVar) {}
+ // When waiter is signaled, finishCallback will be called while holding replCoord _mutex
+ // since WaiterLists are protected by _mutex.
+ WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback)
+ : opTime(_opTime), finishCallback(_finishCallback) {}
-void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
- invariant(condVar);
- condVar->notify_all();
-}
+ BSONObj toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("opId", opID);
+ bob.append("opTime", opTime.toBSON());
+ if (writeConcern) {
+ bob.append("writeConcern", writeConcern->toBSON());
+ }
+ return bob.obj();
+ };
-ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime,
- FinishFunc _finishCallback)
- : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {}
+ std::string toString() const {
+ return toBSON().toString();
+ };
-void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() {
- invariant(finishCallback);
- finishCallback();
-}
+ // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex.
+ void notify() {
+ if (condVar) {
+ condVar->notify_all();
+ }
+ if (finishCallback) {
+ finishCallback();
+ }
+ }
+ const unsigned int opID = 0;
+ const OpTime opTime;
+ const WriteConcernOptions* writeConcern = nullptr;
+ stdx::condition_variable* condVar = nullptr;
+ // The callback that will be called when this waiter is notified.
+ FinishFunc finishCallback = nullptr;
+};
-class ReplicationCoordinatorImpl::WaiterGuard {
-public:
+struct ReplicationCoordinatorImpl::WaiterInfoGuard {
/**
* Constructor takes the list of waiters and enqueues itself on the list, removing itself
* in the destructor.
@@ -205,17 +214,23 @@ public:
* _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
* of these without holding _mutex
*/
- WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) {
- list->add_inlock(_waiter);
+ WaiterInfoGuard(WaiterList* list,
+ unsigned int opID,
+ const OpTime opTime,
+ const WriteConcernOptions* writeConcern,
+ stdx::condition_variable* condVar)
+ : waiter(opID, opTime, writeConcern, condVar), _list(list) {
+ list->add_inlock(&waiter);
}
- ~WaiterGuard() {
- _list->remove_inlock(_waiter);
+ ~WaiterInfoGuard() {
+ _list->remove_inlock(&waiter);
}
+ WaiterInfo waiter;
+
private:
WaiterList* _list;
- Waiter* _waiter;
};
void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
@@ -224,46 +239,33 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
stdx::function<bool(WaiterType)> func) {
- // Only advance iterator when the element doesn't match.
- for (auto it = _list.begin(); it != _list.end();) {
- if (!func(*it)) {
- ++it;
- continue;
- }
-
- WaiterType waiter = std::move(*it);
- if (it == std::prev(_list.end())) {
- // Iterator will be invalid after erasing the last element, so set it to the
- // next one (i.e. end()).
- it = _list.erase(it);
- } else {
- // Iterator is still valid after pop_back().
- std::swap(*it, _list.back());
- _list.pop_back();
+ std::vector<WaiterType>::iterator it = _list.end();
+ while (true) {
+ it = std::find_if(_list.begin(), _list.end(), func);
+ if (it == _list.end()) {
+ break;
}
-
- // It's important to call notify() after the waiter has been removed from the list
- // since notify() might remove the waiter itself.
- waiter->notify_inlock();
+ (*it)->notify();
+ std::swap(*it, _list.back());
+ _list.pop_back();
}
}
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
- std::vector<WaiterType> list = std::move(_list);
- // Call notify() after removing the waiters from the list.
- for (auto& waiter : list) {
- waiter->notify_inlock();
+ for (auto& waiter : _list) {
+ waiter->notify();
}
+ _list.clear();
}
bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
auto it = std::find(_list.begin(), _list.end(), waiter);
- if (it == _list.end()) {
- return false;
+ if (it != _list.end()) {
+ std::swap(*it, _list.back());
+ _list.pop_back();
+ return true;
}
- std::swap(*it, _list.back());
- _list.pop_back();
- return true;
+ return false;
}
namespace {
@@ -1184,7 +1186,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op
_updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime);
_opTimeWaiterList.signalAndRemoveIf_inlock(
- [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
+ [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
@@ -1350,11 +1352,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
// We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
- ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
- WaiterGuard guard(&_opTimeWaiterList, &waiter);
+ WaiterInfoGuard waitInfo(
+ &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar);
- LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
- << waiter << " until " << opCtx->getDeadline();
+ LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until "
+ << opCtx->getDeadline();
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
if (!waitStatus.isOK()) {
@@ -1742,8 +1744,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
- ThreadWaiter waiter(opTime, &writeConcern, &condVar);
- WaiterGuard guard(&_replicationWaiterList, &waiter);
+ WaiterInfoGuard waitInfo(
+ &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
if (_inShutdown) {
@@ -1761,7 +1763,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
BSONObjBuilder progress;
_appendSlaveInfoData_inlock(&progress);
log() << "Replication for failed WC: " << writeConcern.toBSON()
- << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
+ << ", waitInfo:" << waitInfo.waiter.toBSON()
<< ", progress: " << progress.done();
}
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
@@ -2658,11 +2660,8 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
result = kActionFollowerModeStateChange;
}
- // Exit catchup mode if we're in it and enable replication producer and applier on stepdown.
+ // Enable replication producer and applier on stepdown.
if (_memberState.primary()) {
- if (_catchupState) {
- _catchupState->abort_inlock();
- }
_applierState = ApplierState::Running;
_externalState->startProducerIfStopped();
}
@@ -2766,18 +2765,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
invariant(nextAction != kActionWinElection);
lk.unlock();
_performPostMemberStateUpdateAction(nextAction);
- lk.lock();
- if (!_getMemberState_inlock().primary()) {
- break;
- }
// Notify all secondaries of the election win.
- _restartHeartbeats_inlock();
+ lk.lock();
+ _scheduleElectionWinNotification_inlock();
if (isV1ElectionProtocol()) {
- invariant(!_catchupState);
- _catchupState = stdx::make_unique<CatchupState>(this);
- _catchupState->start_inlock();
+ _scanOpTimeForCatchUp_inlock();
} else {
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
}
break;
}
@@ -2792,114 +2786,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
}
}
-void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
- log() << "Entering primary catch-up mode.";
-
- // No catchup in single node replica set.
- if (_repl->_rsConfig.getNumMembers() == 1) {
- abort_inlock();
- return;
- }
-
- auto timeoutCB = [this](const CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- return;
- }
- log() << "Catchup timed out after becoming primary.";
- stdx::lock_guard<stdx::mutex> lk(_repl->_mutex);
- abort_inlock();
- };
-
- // Schedule timeout callback.
- auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod();
- // Deal with infinity and overflow - no timeout.
- if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout ||
- Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) {
- return;
- }
- auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
- auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
- if (!status.isOK()) {
- log() << "Failed to schedule catchup timeout work.";
- abort_inlock();
- return;
- }
- _timeoutCbh = status.getValue();
-}
-
-void ReplicationCoordinatorImpl::CatchupState::abort_inlock() {
- invariant(_repl->_getMemberState_inlock().primary());
-
- log() << "Exited primary catch-up mode.";
- // Clean up its own members.
- if (_timeoutCbh) {
- _repl->_replExecutor->cancel(_timeoutCbh);
- }
- if (_waiter) {
- _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
- }
-
- // Enter primary drain mode.
- _repl->_enterDrainMode_inlock();
- // Destruct the state itself.
- _repl->_catchupState.reset(nullptr);
-}
-
-void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() {
- auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart();
- // Haven't collected all heartbeat responses.
- if (!targetOpTime) {
- return;
- }
-
- // We've caught up.
- if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
- log() << "Caught up to the latest known optime via heartbeats after becoming primary.";
- abort_inlock();
- return;
- }
-
- // Reset the target optime if it has changed.
- if (_waiter && _waiter->opTime == *targetOpTime) {
- return;
- }
-
- log() << "Heartbeats updated catchup target optime to " << *targetOpTime;
- if (_waiter) {
- _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
- }
- auto targetOpTimeCB = [this, targetOpTime]() {
- // Double check the target time since stepdown may signal us too.
- if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
- log() << "Caught up to the latest known optime successfully after becoming primary.";
- abort_inlock();
- }
- };
- _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB);
- _repl->_opTimeWaiterList.add_inlock(_waiter.get());
-}
-
-Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
- if (!isV1ElectionProtocol()) {
- return Status(ErrorCodes::CommandNotSupported,
- "Primary catch-up is only supported by Protocol Version 1");
- }
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_catchupState) {
- _catchupState->abort_inlock();
- return Status::OK();
- }
- return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
-}
-
void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto scanner = std::make_shared<FreshnessScanner>();
auto scanStartTime = _replExecutor->now();
auto evhStatus = scanner->start(
_replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2908,7 +2801,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2937,7 +2830,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Could not access any nodes within timeout when checking for "
<< "additional ops to apply before finishing transition to primary. "
<< "Will move forward with becoming primary anyway.";
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
@@ -2946,7 +2839,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
log() << "My optime is most up-to-date, skipping catch-up "
<< "and completing transition to primary.";
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
@@ -2959,9 +2852,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Finished catch-up oplog after becoming primary.";
}
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
};
- auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB);
+ auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB);
_opTimeWaiterList.add_inlock(waiterInfo.get());
auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) {
@@ -2974,7 +2867,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
_replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB);
}
-void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
+void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() {
_applierState = ApplierState::Draining;
_externalState->stopProducer();
}
@@ -3065,7 +2958,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
}
void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
- _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) {
+ _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) {
return _doneWaitingForReplication_inlock(
waiter->opTime, SnapshotName::min(), *waiter->writeConcern);
});
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 404bf0e16b9..c5f94f32e97 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -333,8 +333,6 @@ public:
virtual Status stepUpIfEligible() override;
- virtual Status abortCatchupIfNeeded() override;
-
// ================== Test support API ===================
/**
@@ -484,55 +482,16 @@ private:
kActionStartSingleNodeElection
};
- // Abstract struct that holds information about clients waiting for replication.
- // Subclasses need to define how to notify them.
- struct Waiter {
- Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern);
- virtual ~Waiter() = default;
-
- BSONObj toBSON() const;
- std::string toString() const;
- // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex.
- virtual void notify_inlock() = 0;
-
- const OpTime opTime;
- const WriteConcernOptions* writeConcern = nullptr;
- };
-
- // When ThreadWaiter gets notified, it will signal the conditional variable.
- //
- // This is used when a thread wants to block inline until the opTime is reached with the given
- // writeConcern.
- struct ThreadWaiter : public Waiter {
- ThreadWaiter(OpTime _opTime,
- const WriteConcernOptions* _writeConcern,
- stdx::condition_variable* _condVar);
- void notify_inlock() override;
-
- stdx::condition_variable* condVar = nullptr;
- };
-
- // When the waiter is notified, finishCallback will be called while holding replCoord _mutex
- // since WaiterLists are protected by _mutex.
- //
- // This is used when we want to run a callback when the opTime is reached.
- struct CallbackWaiter : public Waiter {
- using FinishFunc = stdx::function<void()>;
-
- CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback);
- void notify_inlock() override;
-
- // The callback that will be called when this waiter is notified.
- FinishFunc finishCallback = nullptr;
- };
-
- class WaiterGuard;
+ // Struct that holds information about clients waiting for replication.
+ struct WaiterInfo;
+ struct WaiterInfoGuard;
class WaiterList {
public:
- using WaiterType = Waiter*;
+ using WaiterType = WaiterInfo*;
- // Adds waiter into the list.
+ // Adds waiter into the list. Usually, the waiter will be signaled only once and then
+ // removed.
void add_inlock(WaiterType waiter);
// Returns whether waiter is found and removed.
bool remove_inlock(WaiterType waiter);
@@ -569,44 +528,6 @@ private:
typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles;
- // The state and logic of primary catchup.
- //
- // When start() is called, CatchupState will schedule the timeout callback. When we get
- // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is
- // set.
- // The primary exits catchup mode when any of the following happens.
- // 1) My last applied optime reaches the target optime, if we've received a heartbeat from all
- // nodes.
- // 2) Catchup timeout expires.
- // 3) Primary steps down.
- // 4) The primary has to roll back to catch up.
- // 5) The primary is too stale to catch up.
- //
- // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the
- // life cycle of the state object aligns with the conceptual state.
- // In shutdown, the timeout callback will be canceled by the executor and the state is safe to
- // destroy.
- //
- // Any function of the state must be called while holding _mutex.
- class CatchupState {
- public:
- CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {}
- // start() can only be called once.
- void start_inlock();
- // Reset the state itself to destruct the state.
- void abort_inlock();
- // Heartbeat calls this function to update the target optime.
- void signalHeartbeatUpdate_inlock();
-
- private:
- ReplicationCoordinatorImpl* _repl; // Not owned.
- // Callback handle used to cancel a scheduled catchup timeout callback.
- ReplicationExecutor::CallbackHandle _timeoutCbh;
- // Handle to a Waiter that contains the current target optime to reach after which
- // we can exit catchup mode.
- std::unique_ptr<CallbackWaiter> _waiter;
- };
-
/**
* Appends a "replicationProgress" section with data for each member in set.
*/
@@ -1247,7 +1168,7 @@ private:
/**
* Finish catch-up mode and start drain mode.
*/
- void _enterDrainMode_inlock();
+ void _finishCatchingUpOplog_inlock();
/**
* Waits for the config state to leave kConfigStartingUp, which indicates that start() has
@@ -1483,10 +1404,6 @@ private:
mutable stdx::mutex _indexPrefetchMutex;
ReplSettings::IndexPrefetchConfig _indexPrefetchConfig =
ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I)
-
- // The catchup state including all catchup logic. The presence of a non-null pointer indicates
- // that the node is currently in catchup mode.
- std::unique_ptr<CatchupState> _catchupState; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 7588fb166d5..1075a7e9232 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -160,7 +160,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- simulateCatchUpAbort();
+ simulateCatchUpTimeout();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
@@ -223,6 +223,8 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
+ // Wait for catchup check to finish.
+ simulateCatchUpTimeout();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
@@ -1278,19 +1280,15 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
class PrimaryCatchUpTest : public ReplCoordTest {
protected:
using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator;
- using NetworkRequestFn = stdx::function<void(const NetworkOpIter)>;
+ using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>;
- const Timestamp smallTimestamp{1, 1};
-
- executor::RemoteCommandResponse makeHeartbeatResponse(OpTime opTime) {
+ void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) {
ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest();
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
- return makeResponseStatus(hbResp.toBSON(true));
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true)));
}
void simulateSuccessfulV1Voting() {
@@ -1302,9 +1300,10 @@ protected:
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString();
- // Process requests until we're primary but leave the heartbeats for the notification
+ bool hasReadyRequests = true;
+ // Process requests until we're primary and consume the heartbeats for the notification
// of election win. Exit immediately on unexpected requests.
- while (!replCoord->getMemberState().primary()) {
+ while (!replCoord->getMemberState().primary() || hasReadyRequests) {
log() << "Waiting on network in state " << replCoord->getMemberState();
net->enterNetwork();
if (net->now() < electionTimeoutWhen) {
@@ -1315,9 +1314,7 @@ protected:
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
- OpTime opTime(Timestamp(), getReplCoord()->getTerm());
- net->scheduleResponse(
- net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime));
+ replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
} else if (request.cmdObj.firstElement().fieldNameStringData() ==
"replSetRequestVotes") {
net->scheduleResponse(net->getNextReadyRequest(),
@@ -1339,11 +1336,12 @@ protected:
// executor.
getReplExec()->waitForDBWork_forTest();
net->runReadyNetworkOperations();
+ hasReadyRequests = net->hasReadyRequests();
net->exitNetwork();
}
}
- ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, bool infiniteTimeout = false) {
+ ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -1358,8 +1356,7 @@ protected:
<< "protocolVersion"
<< 1
<< "settings"
- << BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis"
- << (infiniteTimeout ? -1 : 5000)));
+ << BSON("catchUpTimeoutMillis" << 5000));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
@@ -1381,15 +1378,17 @@ protected:
return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON())));
}
- void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) {
+ void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) {
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
- if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
- onHeartbeatRequest(noi);
+ if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") {
+ onFreshnessScanRequest(noi);
+ } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
+ replyToHeartbeatRequestAsSecondaries(noi);
} else {
log() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
@@ -1400,8 +1399,7 @@ protected:
net->exitNetwork();
}
- // Response heartbeats with opTime until the given time. Exit if it sees any other request.
- void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) {
+ void replyHeartbeatsAndRunUntil(Date_t until) {
auto net = getNet();
net->enterNetwork();
while (net->now() < until) {
@@ -1409,10 +1407,9 @@ protected:
// Peek the next request
auto noi = net->getFrontOfUnscheduledQueue();
auto& request = noi->getRequest();
- log() << request.target << " at " << net->now() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
// Consume the next request
- onHeartbeatRequest(net->getNextReadyRequest());
+ replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
} else {
// Cannot consume other requests than heartbeats.
net->exitNetwork();
@@ -1423,153 +1420,126 @@ protected:
}
net->exitNetwork();
}
-
- // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal
- // the optime waiter.
- void advanceMyLastAppliedOpTime(OpTime opTime) {
- getReplCoord()->setMyLastAppliedOpTime(opTime);
- getNet()->enterNetwork();
- getNet()->runReadyNetworkOperations();
- getNet()->exitNetwork();
- }
};
-// The first round of heartbeats indicates we are the most up-to-date.
-TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) {
+TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- int count = 0;
- processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) {
- count++;
- auto net = getNet();
- // The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1));
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
});
-
- // Get 2 heartbeats from secondaries.
- ASSERT_EQUALS(2, count);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-// Heartbeats set a future target OpTime and we reached that successfully.
-TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
+TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- // The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
+
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ auto request = noi->getRequest();
+ log() << "Black holing request to " << request.target << ": " << request.cmdObj;
+ getNet()->blackHole(noi);
});
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- advanceMyLastAppliedOpTime(time2);
+
+ auto net = getNet();
+ replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod());
+ ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, CatchupTimeout) {
+TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
- replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) {
- // Other nodes are ahead of me.
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
- });
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
- stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out"));
- auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
- Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
- ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
-}
-TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) {
- startCapturingLogMessages();
-
- OpTime time1(Timestamp(100, 1), 0);
- ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // We should get caught up by the timeout time.
- auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
- replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
- getNet()->scheduleResponse(noi, getNet()->now(), status);
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
- }
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
+
+ NetworkInterfaceMock* net = getNet();
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ // Simulate the work done by bgsync and applier threads.
+ // setMyLastAppliedOpTime() will signal the optime waiter.
+ getReplCoord()->setMyLastAppliedOpTime(time2);
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) {
+TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // We should get caught up by the timeout time.
- auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
- replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- log() << "Black holing heartbeat from " << request.target.host();
- getNet()->blackHole(noi);
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
- }
+
+ // The new primary learns of the latest OpTime.
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
+
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) {
+TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // Step down immediately.
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto request = noi->getRequest();
+ log() << "Black holing request to " << request.target << ": " << request.cmdObj;
+ getNet()->blackHole(noi);
+ });
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
- ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
- ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
auto opCtx = makeOperationContext();
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
@@ -1581,25 +1551,30 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- // Step down in the middle of catchup.
- auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2;
- replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) {
- // Other nodes are ahead of me.
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
+
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
+ auto net = getNet();
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+ auto opCtx = makeOperationContext();
+ // Simulate the applier signaling replCoord to exit drain mode.
+ // At this point, we see the stepdown and reset the states.
+ getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
- ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
- ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
- auto opCtx = makeOperationContext();
+ ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -1611,17 +1586,24 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
+ processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
auto net = getNet();
// The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
+ net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
});
+
+ NetworkInterfaceMock* net = getNet();
ReplicationCoordinatorImpl* replCoord = getReplCoord();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- advanceMyLastAppliedOpTime(time2);
+ // Simulate the work done by bgsync and applier threads.
+ // setMyLastAppliedOpTime() will signal the optime waiter.
+ replCoord->setMyLastAppliedOpTime(time2);
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
// Step down during drain mode.
TopologyCoordinator::UpdateTermResult updateTermResult;
@@ -1635,10 +1617,9 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
simulateSuccessfulV1Voting();
ASSERT_TRUE(replCoord->getMemberState().primary());
- // No need to catch up, so we enter drain mode.
- processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
+ // No need to catch-up, so we enter drain mode.
+ processFreshnessScanRequests([this](const NetworkOpIter noi) {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
});
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
auto opCtx = makeOperationContext();
@@ -1652,113 +1633,6 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
- OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(200, 1), 0);
- OpTime time3(Timestamp(300, 1), 0);
- OpTime time4(Timestamp(400, 1), 0);
-
- // 1) The primary is at time 1 at the beginning.
- ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- // 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available.
- // So the target optime is time 3.
- startCapturingLogMessages();
- auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3;
- replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
- getNet()->scheduleResponse(noi, getNet()->now(), status);
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
- }
- });
- // The node is still in catchup mode, but the target optime has been set.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
-
- // 3) Advancing its applied optime to time 2 isn't enough.
- advanceMyLastAppliedOpTime(time2);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
- // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target.
- startCapturingLogMessages();
- auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3;
- replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) {
- const RemoteCommandRequest& request = noi->getRequest();
- if (request.target.host() == "node2") {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4));
- } else {
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
- }
- });
- // The node is still in catchup mode, but the target optime has been updated.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
-
- // 5) Advancing to time 3 isn't enough now.
- advanceMyLastAppliedOpTime(time3);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
- // 6) The node catches up time 4 eventually.
- startCapturingLogMessages();
- advanceMyLastAppliedOpTime(time4);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
- stopCapturingLogMessages();
- ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime"));
- auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
- Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
- ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
-}
-
-TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) {
- startCapturingLogMessages();
-
- OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(100, 2), 0);
- ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, true);
-
- // Run time far forward and ensure we are still in catchup mode.
- // This is an arbitrary time 'far' into the future.
- auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10;
- replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) {
- // Other nodes are ahead of me.
- getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
-
- // Simulate the heartbeats from secondaries to primary to update liveness info.
- // TODO(sz): Remove this after merging liveness info and heartbeats.
- const RemoteCommandRequest& request = noi->getRequest();
- ReplSetHeartbeatArgsV1 hbArgs;
- hbArgs.setConfigVersion(config.getConfigVersion());
- hbArgs.setSetName(config.getReplSetName());
- hbArgs.setSenderHost(request.target);
- hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId());
- hbArgs.setTerm(getReplCoord()->getTerm());
- ASSERT(hbArgs.isInitialized());
- ReplSetHeartbeatResponse response;
- ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
- });
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
- // Simulate a user initiated abort.
- ASSERT_OK(getReplCoord()->abortCatchupIfNeeded());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
-
- stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
- ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
- ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
- auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
- Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
- ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
-}
-
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 68e1a61c4ac..0495e843590 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -234,11 +234,6 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
// Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
_signalStepDownWaiter_inlock();
- // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing.
- if (_catchupState) {
- _catchupState->signalHeartbeatUpdate_inlock();
- }
-
_scheduleHeartbeatToTarget_inlock(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
@@ -670,9 +665,6 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
}
_scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now);
}
-
- _topCoord->restartHeartbeats();
-
if (isV1ElectionProtocol()) {
for (auto&& slaveInfo : _slaveInfo) {
slaveInfo.lastUpdate = _replExecutor->now();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 1f3b0881d5c..e76c37898a3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -107,6 +107,11 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext opCtx,
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
replCoord->waitForElectionFinish_forTest();
+ // Wait for primary catch-up
+ net->enterNetwork();
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
@@ -5052,6 +5057,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
+ simulateCatchUpTimeout();
// Successful dry run election increases term.
ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 18397d23a23..27d62c0af1e 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -497,9 +497,5 @@ void ReplicationCoordinatorMock::setMaster(bool isMaster) {
_settings.setMaster(isMaster);
}
-Status ReplicationCoordinatorMock::abortCatchupIfNeeded() {
- return Status::OK();
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 72bc120f226..fdfa491f0ad 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -284,8 +284,6 @@ public:
return _service;
}
- virtual Status abortCatchupIfNeeded() override;
-
private:
AtomicUInt64 _snapshotNameGenerator;
ServiceContext* const _service;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index aaa00fcdae4..e5a17dd5c1a 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -323,10 +323,6 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
- // The smallest valid optime in PV1.
- OpTime opTime(Timestamp(), 0);
- hbResp.setAppliedOpTime(opTime);
- hbResp.setDurableOpTime(opTime);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true)));
} else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") {
@@ -492,30 +488,32 @@ void ReplCoordTest::disableSnapshots() {
_externalState->setAreSnapshotsEnabled(false);
}
-void ReplCoordTest::simulateCatchUpAbort() {
+void ReplCoordTest::simulateCatchUpTimeout() {
NetworkInterfaceMock* net = getNet();
- auto heartbeatTimeoutWhen =
- net->now() + getReplCoord()->getConfig().getHeartbeatTimeoutPeriodMillis();
+ auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod();
bool hasRequest = false;
net->enterNetwork();
- if (net->now() < heartbeatTimeoutWhen) {
- net->runUntil(heartbeatTimeoutWhen);
+ if (net->now() < catchUpTimeoutWhen) {
+ net->runUntil(catchUpTimeoutWhen);
}
hasRequest = net->hasReadyRequests();
+ net->exitNetwork();
+
while (hasRequest) {
+ net->enterNetwork();
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
// Black hole heartbeat requests caused by time advance.
log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj;
net->blackHole(noi);
- if (net->now() < heartbeatTimeoutWhen) {
- net->runUntil(heartbeatTimeoutWhen);
+ if (net->now() < catchUpTimeoutWhen) {
+ net->runUntil(catchUpTimeoutWhen);
} else {
net->runReadyNetworkOperations();
}
hasRequest = net->hasReadyRequests();
+ net->exitNetwork();
}
- net->exitNetwork();
}
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index ab2653be11d..972e2f503ae 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -253,9 +253,9 @@ protected:
void disableSnapshots();
/**
- * Timeout all heartbeat requests for primary catch-up.
+ * Timeout all freshness scan request for primary catch-up.
*/
- void simulateCatchUpAbort();
+ void simulateCatchUpTimeout();
private:
std::unique_ptr<ReplicationCoordinatorImpl> _repl;
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 37aad7459fa..2a2fc1259fb 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -499,21 +499,6 @@ public:
*/
virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0;
- /**
- * Reset the booleans to record the last heartbeat restart.
- */
- virtual void restartHeartbeats() = 0;
-
- /**
- * Scans through all members that are 'up' and return the latest known optime, if we have
- * received (successful or failed) heartbeats from all nodes since heartbeat restart.
- *
- * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted
- * heartbeats.
- * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down.
- */
- virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const = 0;
-
protected:
TopologyCoordinator() {}
};
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 4211dee4860..1c1f53f89a1 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2648,35 +2648,5 @@ void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool support
supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
}
-void TopologyCoordinatorImpl::restartHeartbeats() {
- for (auto& hb : _hbdata) {
- hb.restart();
- }
-}
-
-boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const {
- // The smallest OpTime in PV1.
- OpTime latest(Timestamp(0, 0), 0);
- for (size_t i = 0; i < _hbdata.size(); i++) {
- auto& peer = _hbdata[i];
-
- if (static_cast<int>(i) == _selfIndex) {
- continue;
- }
- // If any heartbeat is not fresh enough, return none.
- if (!peer.isUpdatedSinceRestart()) {
- return boost::none;
- }
- // Ignore down members
- if (!peer.up()) {
- continue;
- }
- if (peer.getAppliedOpTime() > latest) {
- latest = peer.getAppliedOpTime();
- }
- }
- return latest;
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 0c268b67347..a52758ab8ed 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -246,10 +246,6 @@ public:
bool isPriorityTakeover);
virtual void setStorageEngineSupportsReadCommitted(bool supported);
- virtual void restartHeartbeats();
-
- virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
-
////////////////////////////////////////////////////////////
//
// Test support methods
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index e6865a2e649..fa3834dd03a 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -681,8 +681,7 @@ var ReplSetTest = function(opts) {
return config;
}
- // Check journaling by sending commands through the bridge if it's used.
- if (_isRunningWithoutJournaling(this.nodes[0])) {
+ if (_isRunningWithoutJournaling(replNode)) {
config[wcMajorityJournalField] = false;
}
@@ -874,11 +873,9 @@ var ReplSetTest = function(opts) {
};
this.reInitiate = function() {
- var config = this.getReplSetConfigFromNode();
- var newConfig = this.getReplSetConfig();
- // Only reset members.
- config.members = newConfig.members;
- config.version += 1;
+ var config = this.getReplSetConfig();
+ var newVersion = this.getReplSetConfigFromNode().version + 1;
+ config.version = newVersion;
this._setDefaultConfigOptions(config);