summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-03-23 01:25:09 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-04-19 18:11:10 -0400
commitd0c851e2f4bfea514e22c97af1838640d2849a8c (patch)
tree956467fa52d24b3892aee23caeabb43022c3dc13
parent85472b2350952750b658178fc64bf80d8d357348 (diff)
downloadmongo-d0c851e2f4bfea514e22c97af1838640d2849a8c.tar.gz
SERVER-26848 Exit catchup mode when not syncing more data.
-rw-r--r--jstests/multiVersion/downgrade_replset.js65
-rw-r--r--jstests/multiVersion/initialsync.js9
-rw-r--r--jstests/replsets/catchup.js187
-rw-r--r--jstests/replsets/rslib.js1
-rw-r--r--src/mongo/db/repl/bgsync.cpp8
-rw-r--r--src/mongo/db/repl/member_heartbeat_data.cpp4
-rw-r--r--src/mongo/db/repl/member_heartbeat_data.h14
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp35
-rw-r--r--src/mongo/db/repl/repl_set_config.cpp9
-rw-r--r--src/mongo/db/repl/repl_set_config.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp283
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h97
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp326
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp22
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h4
-rw-r--r--src/mongo/db/repl/topology_coordinator.h15
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h4
-rw-r--r--src/mongo/shell/replsettest.js11
24 files changed, 820 insertions, 330 deletions
diff --git a/jstests/multiVersion/downgrade_replset.js b/jstests/multiVersion/downgrade_replset.js
index 658b35813a2..022471410a1 100644
--- a/jstests/multiVersion/downgrade_replset.js
+++ b/jstests/multiVersion/downgrade_replset.js
@@ -14,38 +14,45 @@ var nodes = {
n3: {binVersion: newVersion}
};
-var rst = new ReplSetTest({name: name, nodes: nodes});
-rst.startSet();
-var replSetConfig = rst.getReplSetConfig();
-replSetConfig.protocolVersion = 0;
-rst.initiate(replSetConfig);
-
-var primary = rst.getPrimary();
-var coll = "test.foo";
-
-jsTest.log("Inserting documents into collection.");
-for (var i = 0; i < 10; i++) {
- primary.getCollection(coll).insert({_id: i, str: "hello world"});
-}
+function runDowngradeTest(protocolVersion) {
+ var rst = new ReplSetTest({name: name, nodes: nodes});
+ rst.startSet();
+ var replSetConfig = rst.getReplSetConfig();
+ replSetConfig.protocolVersion = protocolVersion;
+ // Hard-code catchup timeout to be compatible with 3.4
+ replSetConfig.settings = {catchUpTimeoutMillis: 2000};
+ rst.initiate(replSetConfig);
+
+ var primary = rst.getPrimary();
+ var coll = "test.foo";
+
+ jsTest.log("Inserting documents into collection.");
+ for (var i = 0; i < 10; i++) {
+ primary.getCollection(coll).insert({_id: i, str: "hello world"});
+ }
-function insertDocuments(rsURL, coll) {
- var coll = new Mongo(rsURL).getCollection(coll);
- var count = 10;
- while (!isFinished()) {
- assert.writeOK(coll.insert({_id: count, str: "hello world"}));
- count++;
+ function insertDocuments(rsURL, coll) {
+ var coll = new Mongo(rsURL).getCollection(coll);
+ var count = 10;
+ while (!isFinished()) {
+ assert.writeOK(coll.insert({_id: count, str: "hello world"}));
+ count++;
+ }
}
-}
-jsTest.log("Starting parallel operations during downgrade..");
-var joinFindInsert = startParallelOps(primary, insertDocuments, [rst.getURL(), coll]);
+ jsTest.log("Starting parallel operations during downgrade..");
+ var joinFindInsert = startParallelOps(primary, insertDocuments, [rst.getURL(), coll]);
-jsTest.log("Downgrading replica set..");
-rst.upgradeSet({binVersion: oldVersion});
-jsTest.log("Downgrade complete.");
+ jsTest.log("Downgrading replica set..");
+ rst.upgradeSet({binVersion: oldVersion});
+ jsTest.log("Downgrade complete.");
-primary = rst.getPrimary();
-printjson(rst.status());
+ primary = rst.getPrimary();
+ printjson(rst.status());
+
+ joinFindInsert();
+ rst.stopSet();
+}
-joinFindInsert();
-rst.stopSet();
+runDowngradeTest(0);
+runDowngradeTest(1);
diff --git a/jstests/multiVersion/initialsync.js b/jstests/multiVersion/initialsync.js
index d8c1d629fd0..a36d538a6f8 100644
--- a/jstests/multiVersion/initialsync.js
+++ b/jstests/multiVersion/initialsync.js
@@ -7,13 +7,15 @@ var newVersion = "latest";
var name = "multiversioninitsync";
-var multitest = function(replSetVersion, newNodeVersion) {
+var multitest = function(replSetVersion, newNodeVersion, configSettings) {
var nodes = {n1: {binVersion: replSetVersion}, n2: {binVersion: replSetVersion}};
print("Start up a two-node " + replSetVersion + " replica set.");
var rst = new ReplSetTest({name: name, nodes: nodes});
rst.startSet();
- rst.initiate();
+ var conf = rst.getReplSetConfig();
+ conf.settings = configSettings;
+ rst.initiate(conf);
// Wait for a primary node.
var primary = rst.getPrimary();
@@ -50,4 +52,5 @@ multitest(oldVersion, newVersion);
// Old Secondary is synced from a "latest"
// version ReplSet.
// *****************************************
-multitest(newVersion, oldVersion);
+// Hard-code catchup timeout. The default timeout on 3.5 is -1, which is invalid on 3.4.
+multitest(newVersion, oldVersion, {catchUpTimeoutMillis: 2000});
diff --git a/jstests/replsets/catchup.js b/jstests/replsets/catchup.js
index 542ad51c723..51632379463 100644
--- a/jstests/replsets/catchup.js
+++ b/jstests/replsets/catchup.js
@@ -12,6 +12,7 @@
rst.startSet();
var conf = rst.getReplSetConfig();
+ conf.members[2].priority = 0;
conf.settings = {
heartbeatIntervalMillis: 500,
electionTimeoutMillis: 10000,
@@ -34,7 +35,7 @@
node.adminCommand(verbosity);
});
- function stepUp(node) {
+ function stepUpNode(node) {
assert.soon(function() {
node.adminCommand({replSetStepUp: 1});
return node.adminCommand('replSetGetStatus').myState == ReplSetTest.State.PRIMARY;
@@ -43,12 +44,6 @@
return node;
}
- function doWrites(node) {
- for (var i = 0; i < 3; i++) {
- assert.writeOK(node.getDB("test").foo.insert({x: i}));
- }
- }
-
function checkOpInOplog(node, op, count) {
node.getDB("admin").getMongo().setSlaveOk();
var oplog = node.getDB("local")['oplog.rs'];
@@ -56,98 +51,148 @@
assert.eq(oplog.count(op), count, "op: " + tojson(op) + ", oplog: " + tojson(oplogArray));
}
- function isEarlierTimestamp(ts1, ts2) {
- if (ts1.getTime() == ts2.getTime()) {
- return ts1.getInc() < ts2.getInc();
+ // Stop replication on secondaries, do writes and step up one of the secondaries.
+ //
+ // The old primary has extra writes that are not replicated to the other nodes yet,
+ // but the new primary steps up, getting the vote from the the third node "voter".
+ function stopRelicationAndEnforceNewPrimaryToCatchUp() {
+ // Write documents that cannot be replicated to secondaries in time.
+ var oldSecondaries = rst.getSecondaries();
+ var oldPrimary = rst.getPrimary();
+ stopServerReplication(oldSecondaries);
+ for (var i = 0; i < 3; i++) {
+ assert.writeOK(oldPrimary.getDB("test").foo.insert({x: i}));
}
- return ts1.getTime() < ts2.getTime();
+ var latestOpOnOldPrimary = getLatestOp(oldPrimary);
+ // New primary wins immediately, but needs to catch up.
+ var newPrimary = stepUpNode(oldSecondaries[0]);
+ rst.awaitNodesAgreeOnPrimary();
+ var latestOpOnNewPrimary = getLatestOp(newPrimary);
+ // Check this node is not writable.
+ assert.eq(newPrimary.getDB("test").isMaster().ismaster, false);
+
+ return {
+ oldSecondaries: oldSecondaries,
+ oldPrimary: oldPrimary,
+ newPrimary: newPrimary,
+ voter: oldSecondaries[1],
+ latestOpOnOldPrimary: latestOpOnOldPrimary,
+ latestOpOnNewPrimary: latestOpOnNewPrimary
+ };
+ }
+
+ function reconfigCatchUpTimeoutMillis(timeout) {
+ // Reconnect all nodes to make sure reconfig succeeds.
+ rst.nodes.forEach(reconnect);
+ // Reconfigure replicaset to decrease catchup timeout
+ conf = rst.getReplSetConfigFromNode();
+ conf.version++;
+ conf.settings.catchUpTimeoutMillis = timeout;
+ reconfig(rst, conf);
+ rst.awaitReplication();
+ rst.awaitNodesAgreeOnPrimary();
}
- rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
+ rst.awaitReplication();
- jsTest.log("Case 1: The primary is up-to-date after freshness scan.");
+ jsTest.log("Case 1: The primary is up-to-date after refreshing heartbeats.");
// Should complete transition to primary immediately.
- var newPrimary = stepUp(rst.getSecondary());
+ var newPrimary = stepUpNode(rst.getSecondary());
rst.awaitNodesAgreeOnPrimary();
// Should win an election and finish the transition very quickly.
assert.eq(newPrimary, rst.getPrimary());
- rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
+ rst.awaitReplication();
jsTest.log("Case 2: The primary needs to catch up, succeeds in time.");
- // Write documents that cannot be replicated to secondaries in time.
- var originalSecondaries = rst.getSecondaries();
- stopServerReplication(originalSecondaries);
- doWrites(rst.getPrimary());
- var latestOp = getLatestOp(rst.getPrimary());
- // New primary wins immediately, but needs to catch up.
- newPrimary = stepUp(rst.getSecondary());
- rst.awaitNodesAgreeOnPrimary();
- // Check this node is not writable.
- assert.eq(newPrimary.getDB("test").isMaster().ismaster, false);
+ var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
+
// Disable fail point to allow replication.
- restartServerReplication(originalSecondaries);
+ restartServerReplication(stepUpResults.oldSecondaries);
// getPrimary() blocks until the primary finishes drain mode.
- assert.eq(newPrimary, rst.getPrimary());
+ assert.eq(stepUpResults.newPrimary, rst.getPrimary());
// Wait for all secondaries to catch up
rst.awaitReplication();
// Check the latest op on old primary is preserved on the new one.
- checkOpInOplog(newPrimary, latestOp, 1);
- rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
+ checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1);
+ rst.awaitReplication();
jsTest.log("Case 3: The primary needs to catch up, but has to change sync source to catch up.");
- // Write documents that cannot be replicated to secondaries in time.
- stopServerReplication(rst.getSecondaries());
- doWrites(rst.getPrimary());
- var oldPrimary = rst.getPrimary();
- originalSecondaries = rst.getSecondaries();
- latestOp = getLatestOp(oldPrimary);
- newPrimary = stepUp(originalSecondaries[0]);
- rst.awaitNodesAgreeOnPrimary();
- // Disable fail point on one of the other secondaries.
- // Wait until it catches up with the old primary.
- restartServerReplication(originalSecondaries[1]);
- assert.commandWorked(originalSecondaries[1].adminCommand({replSetSyncFrom: oldPrimary.host}));
- awaitOpTime(originalSecondaries[1], latestOp.ts);
+ stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
+
+ // Disable fail point on the voter. Wait until it catches up with the old primary.
+ restartServerReplication(stepUpResults.voter);
+ assert.commandWorked(
+ stepUpResults.voter.adminCommand({replSetSyncFrom: stepUpResults.oldPrimary.host}));
+ awaitOpTime(stepUpResults.voter, stepUpResults.latestOpOnOldPrimary.ts);
// Disconnect the new primary and the old one.
- oldPrimary.disconnect(newPrimary);
+ stepUpResults.oldPrimary.disconnect(stepUpResults.newPrimary);
// Disable the failpoint, the new primary should sync from the other secondary.
- restartServerReplication(newPrimary);
- assert.eq(newPrimary, rst.getPrimary());
- checkOpInOplog(newPrimary, latestOp, 1);
+ restartServerReplication(stepUpResults.newPrimary);
+ assert.eq(stepUpResults.newPrimary, rst.getPrimary());
+ checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 1);
// Restore the broken connection
- oldPrimary.reconnect(newPrimary);
- rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
+ stepUpResults.oldPrimary.reconnect(stepUpResults.newPrimary);
+ rst.awaitReplication();
jsTest.log("Case 4: The primary needs to catch up, fails due to timeout.");
- // Reconfigure replicaset to decrease catchup timeout
- conf = rst.getReplSetConfigFromNode();
- conf.version++;
- conf.settings.catchUpTimeoutMillis = 10 * 1000;
- reconfig(rst, conf);
- rst.awaitReplication(ReplSetTest.kDefaultTimeoutMS, ReplSetTest.OpTimeType.LAST_DURABLE);
- rst.awaitNodesAgreeOnPrimary();
+ reconfigCatchUpTimeoutMillis(10 * 1000);
- // Write documents that cannot be replicated to secondaries in time.
- originalSecondaries = rst.getSecondaries();
- stopServerReplication(originalSecondaries);
- doWrites(rst.getPrimary());
- latestOp = getLatestOp(rst.getPrimary());
-
- // New primary wins immediately, but needs to catch up.
- newPrimary = stepUp(originalSecondaries[0]);
- rst.awaitNodesAgreeOnPrimary();
- var latestOpOnNewPrimary = getLatestOp(newPrimary);
+ stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
// Wait until the new primary completes the transition to primary and writes a no-op.
- checkLog.contains(newPrimary, "Cannot catch up oplog after becoming primary");
- restartServerReplication(newPrimary);
- assert.eq(newPrimary, rst.getPrimary());
+ checkLog.contains(stepUpResults.newPrimary, "Catchup timed out after becoming primary");
+ restartServerReplication(stepUpResults.newPrimary);
+ assert.eq(stepUpResults.newPrimary, rst.getPrimary());
+
+ // Wait for the no-op "new primary" after winning an election, so that we know it has
+ // finished transition to primary.
+ assert.soon(function() {
+ return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary,
+ getLatestOp(stepUpResults.newPrimary)) < 0;
+ });
+ // The extra oplog entries on the old primary are not replicated to the new one.
+ checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0);
+ restartServerReplication(stepUpResults.voter);
+ rst.awaitReplication();
+
+ jsTest.log("Case 5: The primary needs to catch up with no timeout, then gets aborted.");
+ reconfigCatchUpTimeoutMillis(-1);
+ stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
+
+ // Abort catchup.
+ assert.commandWorked(stepUpResults.newPrimary.adminCommand({replSetAbortPrimaryCatchUp: 1}));
// Wait for the no-op "new primary" after winning an election, so that we know it has
// finished transition to primary.
assert.soon(function() {
- return isEarlierTimestamp(latestOpOnNewPrimary.ts, getLatestOp(newPrimary).ts);
+ return rs.compareOpTimes(stepUpResults.latestOpOnOldPrimary,
+ getLatestOp(stepUpResults.newPrimary)) < 0;
});
// The extra oplog entries on the old primary are not replicated to the new one.
- checkOpInOplog(newPrimary, latestOp, 0);
- restartServerReplication(originalSecondaries[1]);
+ checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0);
+ restartServerReplication(stepUpResults.oldSecondaries);
+ rst.awaitReplication();
+ checkOpInOplog(stepUpResults.newPrimary, stepUpResults.latestOpOnOldPrimary, 0);
+
+ // TODO: Uncomment case 6 when SERVER-28751 gets fixed.
+ //
+ // jsTest.log("Case 6: The primary needs to catch up with no timeout, but steps down.");
+ // var stepUpResults = stopRelicationAndEnforceNewPrimaryToCatchUp();
+
+ // // Step-down command should abort catchup.
+ // try {
+ // printjson(stepUpResults.newPrimary.adminCommand({replSetStepDown: 60}));
+ // } catch (e) {
+ // print(e);
+ // }
+ // // Rename the primary.
+ // var steppedDownPrimary = stepUpResults.newPrimary;
+ // var newPrimary = rst.getPrimary();
+ // assert.neq(newPrimary, steppedDownPrimary);
+
+ // // Enable data replication on the stepped down primary and make sure it syncs old writes.
+ // rst.nodes.forEach(reconnect);
+ // restartServerReplication(stepUpResults.oldSecondaries);
+ // rst.awaitReplication();
+ // checkOpInOplog(steppedDownPrimary, stepUpResults.latestOpOnOldPrimary, 1);
+
})();
diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js
index 1471824bd8f..5911723d717 100644
--- a/jstests/replsets/rslib.js
+++ b/jstests/replsets/rslib.js
@@ -162,6 +162,7 @@ var getLastOpTime;
if (!isNetworkError(e)) {
throw e;
}
+ print("Calling replSetReconfig failed. " + tojson(e));
}
var master = rs.getPrimary().getDB("admin");
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index df792fa6fdc..9d129284de0 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -313,13 +313,12 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
- // TODO: End catchup mode early if we are too stale.
if (_replCoord->getMemberState().primary()) {
warning() << "Too stale to catch up.";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen
<< " from " << syncSourceResp.getSyncSource();
- sleepsecs(1);
+ _replCoord->abortCatchupIfNeeded();
return;
}
@@ -568,9 +567,8 @@ void BackgroundSync::_runRollback(OperationContext* opCtx,
int requiredRBID,
StorageInterface* storageInterface) {
if (_replCoord->getMemberState().primary()) {
- // TODO: Abort catchup mode early if rollback detected.
- warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
- sleepsecs(1);
+ warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode.";
+ _replCoord->abortCatchupIfNeeded();
return;
}
diff --git a/src/mongo/db/repl/member_heartbeat_data.cpp b/src/mongo/db/repl/member_heartbeat_data.cpp
index c267a6ba8ed..1b9b9ea3f13 100644
--- a/src/mongo/db/repl/member_heartbeat_data.cpp
+++ b/src/mongo/db/repl/member_heartbeat_data.cpp
@@ -54,6 +54,8 @@ void MemberHeartbeatData::setUpValues(Date_t now,
}
_authIssue = false;
_lastHeartbeat = now;
+ _updatedSinceRestart = true;
+
if (!hbResponse.hasState()) {
hbResponse.setState(MemberState::RS_UNKNOWN);
}
@@ -77,6 +79,7 @@ void MemberHeartbeatData::setDownValues(Date_t now, const std::string& heartbeat
_upSince = Date_t();
_lastHeartbeat = now;
_authIssue = false;
+ _updatedSinceRestart = true;
_lastResponse = ReplSetHeartbeatResponse();
_lastResponse.setState(MemberState::RS_DOWN);
@@ -91,6 +94,7 @@ void MemberHeartbeatData::setAuthIssue(Date_t now) {
_upSince = Date_t();
_lastHeartbeat = now;
_authIssue = true;
+ _updatedSinceRestart = true;
_lastResponse = ReplSetHeartbeatResponse();
_lastResponse.setState(MemberState::RS_UNKNOWN);
diff --git a/src/mongo/db/repl/member_heartbeat_data.h b/src/mongo/db/repl/member_heartbeat_data.h
index 0f5dacd9081..f67a0a87757 100644
--- a/src/mongo/db/repl/member_heartbeat_data.h
+++ b/src/mongo/db/repl/member_heartbeat_data.h
@@ -123,6 +123,17 @@ public:
*/
void setAuthIssue(Date_t now);
+ /**
+ * Reset the boolean to record the last restart.
+ */
+ void restart() {
+ _updatedSinceRestart = false;
+ }
+
+ bool isUpdatedSinceRestart() const {
+ return _updatedSinceRestart;
+ }
+
private:
// -1 = not checked yet, 0 = member is down/unreachable, 1 = member is up
int _health;
@@ -139,6 +150,9 @@ private:
// The last heartbeat response we received.
ReplSetHeartbeatResponse _lastResponse;
+
+ // Have we received heartbeats since the last restart?
+ bool _updatedSinceRestart = false;
};
} // namespace repl
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index 6348f72bfbd..09226ba0237 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -868,7 +868,7 @@ public:
status = getGlobalReplicationCoordinator()->stepUpIfEligible();
if (!status.isOK()) {
- log() << "replSetStepUp request failed " << causedBy(status);
+ log() << "replSetStepUp request failed" << causedBy(status);
}
return appendCommandStatus(result, status);
@@ -880,5 +880,38 @@ private:
}
} cmdReplSetStepUp;
+class CmdReplSetAbortPrimaryCatchUp : public ReplSetCommand {
+public:
+ virtual void help(stringstream& help) const {
+ help << "{ CmdReplSetAbortPrimaryCatchUp : 1 }\n";
+ help << "Abort primary catch-up mode; immediately finish the transition to primary "
+ "without fetching any further unreplicated writes from any other online nodes";
+ }
+
+ CmdReplSetAbortPrimaryCatchUp() : ReplSetCommand("replSetAbortPrimaryCatchUp") {}
+
+ virtual bool run(OperationContext* opCtx,
+ const string&,
+ BSONObj& cmdObj,
+ string& errmsg,
+ BSONObjBuilder& result) override {
+ Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result);
+ if (!status.isOK())
+ return appendCommandStatus(result, status);
+ log() << "Received replSetAbortPrimaryCatchUp request";
+
+ status = getGlobalReplicationCoordinator()->abortCatchupIfNeeded();
+ if (!status.isOK()) {
+ log() << "replSetAbortPrimaryCatchUp request failed" << causedBy(status);
+ }
+ return appendCommandStatus(result, status);
+ }
+
+private:
+ ActionSet getAuthActionSet() const override {
+ return ActionSet{ActionType::replSetStateChange};
+ }
+} cmdReplSetAbortPrimaryCatchUp;
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp
index f7140cc56bf..2fbbd7cad52 100644
--- a/src/mongo/db/repl/repl_set_config.cpp
+++ b/src/mongo/db/repl/repl_set_config.cpp
@@ -44,6 +44,7 @@ namespace repl {
const size_t ReplSetConfig::kMaxMembers;
const size_t ReplSetConfig::kMaxVotingMembers;
+const Milliseconds ReplSetConfig::kInfiniteCatchUpTimeout(-1);
const std::string ReplSetConfig::kConfigServerFieldName = "configsvr";
const std::string ReplSetConfig::kVersionFieldName = "version";
@@ -51,7 +52,7 @@ const std::string ReplSetConfig::kMajorityWriteConcernModeName = "$majority";
const Milliseconds ReplSetConfig::kDefaultHeartbeatInterval(2000);
const Seconds ReplSetConfig::kDefaultHeartbeatTimeoutPeriod(10);
const Milliseconds ReplSetConfig::kDefaultElectionTimeoutPeriod(10000);
-const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(2000);
+const Milliseconds ReplSetConfig::kDefaultCatchUpTimeoutPeriod(kInfiniteCatchUpTimeout);
const bool ReplSetConfig::kDefaultChainingAllowed(true);
namespace {
@@ -270,14 +271,14 @@ Status ReplSetConfig::_parseSettingsSubdocument(const BSONObj& settings) {
//
// Parse catchUpTimeoutMillis
//
- auto notLessThanZero = stdx::bind(std::greater_equal<long long>(), stdx::placeholders::_1, 0);
+ auto validCatchUpTimeout = [](long long timeout) { return timeout >= 0LL || timeout == -1LL; };
long long catchUpTimeoutMillis;
Status catchUpTimeoutStatus = bsonExtractIntegerFieldWithDefaultIf(
settings,
kCatchUpTimeoutFieldName,
durationCount<Milliseconds>(kDefaultCatchUpTimeoutPeriod),
- notLessThanZero,
- "catch-up timeout must be greater than or equal to 0",
+ validCatchUpTimeout,
+ "catch-up timeout must be positive, 0 (no catch-up) or -1 (infinite catch-up).",
&catchUpTimeoutMillis);
if (!catchUpTimeoutStatus.isOK()) {
return catchUpTimeoutStatus;
diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h
index e44e51b12a7..63d5bcb80e8 100644
--- a/src/mongo/db/repl/repl_set_config.h
+++ b/src/mongo/db/repl/repl_set_config.h
@@ -59,6 +59,7 @@ public:
static const size_t kMaxMembers = 50;
static const size_t kMaxVotingMembers = 7;
+ static const Milliseconds kInfiniteCatchUpTimeout;
static const Milliseconds kDefaultElectionTimeoutPeriod;
static const Milliseconds kDefaultHeartbeatInterval;
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 8446e718d30..c53f5a7b655 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -880,6 +880,11 @@ public:
virtual ServiceContext* getServiceContext() = 0;
+ /**
+ * Abort catchup if the node is in catchup mode.
+ */
+ virtual Status abortCatchupIfNeeded() = 0;
+
protected:
ReplicationCoordinator();
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f42e712ffd3..301f2c92c7c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -155,54 +155,45 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const {
return toBSON().toString();
}
-struct ReplicationCoordinatorImpl::WaiterInfo {
+ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
+ : opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
- using FinishFunc = stdx::function<void()>;
+BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("opTime", opTime.toBSON());
+ if (writeConcern) {
+ bob.append("writeConcern", writeConcern->toBSON());
+ }
+ return bob.obj();
+};
- WaiterInfo(unsigned int _opID,
- const OpTime _opTime,
- const WriteConcernOptions* _writeConcern,
- stdx::condition_variable* _condVar)
- : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {}
+std::string ReplicationCoordinatorImpl::Waiter::toString() const {
+ return toBSON().toString();
+};
- // When waiter is signaled, finishCallback will be called while holding replCoord _mutex
- // since WaiterLists are protected by _mutex.
- WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback)
- : opTime(_opTime), finishCallback(_finishCallback) {}
- BSONObj toBSON() const {
- BSONObjBuilder bob;
- bob.append("opId", opID);
- bob.append("opTime", opTime.toBSON());
- if (writeConcern) {
- bob.append("writeConcern", writeConcern->toBSON());
- }
- return bob.obj();
- };
+ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
+ const WriteConcernOptions* _writeConcern,
+ stdx::condition_variable* _condVar)
+ : Waiter(_opTime, _writeConcern), condVar(_condVar) {}
- std::string toString() const {
- return toBSON().toString();
- };
+void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
+ invariant(condVar);
+ condVar->notify_all();
+}
- // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex.
- void notify() {
- if (condVar) {
- condVar->notify_all();
- }
- if (finishCallback) {
- finishCallback();
- }
- }
+ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime,
+ FinishFunc _finishCallback)
+ : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {}
+
+void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() {
+ invariant(finishCallback);
+ finishCallback();
+}
- const unsigned int opID = 0;
- const OpTime opTime;
- const WriteConcernOptions* writeConcern = nullptr;
- stdx::condition_variable* condVar = nullptr;
- // The callback that will be called when this waiter is notified.
- FinishFunc finishCallback = nullptr;
-};
-struct ReplicationCoordinatorImpl::WaiterInfoGuard {
+class ReplicationCoordinatorImpl::WaiterGuard {
+public:
/**
* Constructor takes the list of waiters and enqueues itself on the list, removing itself
* in the destructor.
@@ -214,23 +205,17 @@ struct ReplicationCoordinatorImpl::WaiterInfoGuard {
* _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
* of these without holding _mutex
*/
- WaiterInfoGuard(WaiterList* list,
- unsigned int opID,
- const OpTime opTime,
- const WriteConcernOptions* writeConcern,
- stdx::condition_variable* condVar)
- : waiter(opID, opTime, writeConcern, condVar), _list(list) {
- list->add_inlock(&waiter);
+ WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) {
+ list->add_inlock(_waiter);
}
- ~WaiterInfoGuard() {
- _list->remove_inlock(&waiter);
+ ~WaiterGuard() {
+ _list->remove_inlock(_waiter);
}
- WaiterInfo waiter;
-
private:
WaiterList* _list;
+ Waiter* _waiter;
};
void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
@@ -239,33 +224,46 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
stdx::function<bool(WaiterType)> func) {
- std::vector<WaiterType>::iterator it = _list.end();
- while (true) {
- it = std::find_if(_list.begin(), _list.end(), func);
- if (it == _list.end()) {
- break;
+ // Only advance iterator when the element doesn't match.
+ for (auto it = _list.begin(); it != _list.end();) {
+ if (!func(*it)) {
+ ++it;
+ continue;
+ }
+
+ WaiterType waiter = std::move(*it);
+ if (it == std::prev(_list.end())) {
+ // Iterator will be invalid after erasing the last element, so set it to the
+ // next one (i.e. end()).
+ it = _list.erase(it);
+ } else {
+ // Iterator is still valid after pop_back().
+ std::swap(*it, _list.back());
+ _list.pop_back();
}
- (*it)->notify();
- std::swap(*it, _list.back());
- _list.pop_back();
+
+ // It's important to call notify() after the waiter has been removed from the list
+ // since notify() might remove the waiter itself.
+ waiter->notify_inlock();
}
}
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
- for (auto& waiter : _list) {
- waiter->notify();
+ std::vector<WaiterType> list = std::move(_list);
+ // Call notify() after removing the waiters from the list.
+ for (auto& waiter : list) {
+ waiter->notify_inlock();
}
- _list.clear();
}
bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
auto it = std::find(_list.begin(), _list.end(), waiter);
- if (it != _list.end()) {
- std::swap(*it, _list.back());
- _list.pop_back();
- return true;
+ if (it == _list.end()) {
+ return false;
}
- return false;
+ std::swap(*it, _list.back());
+ _list.pop_back();
+ return true;
}
namespace {
@@ -1186,7 +1184,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op
_updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime);
_opTimeWaiterList.signalAndRemoveIf_inlock(
- [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; });
+ [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
@@ -1352,11 +1350,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
// We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
- WaiterInfoGuard waitInfo(
- &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar);
+ ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
+ WaiterGuard guard(&_opTimeWaiterList, &waiter);
- LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until "
- << opCtx->getDeadline();
+ LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
+ << waiter << " until " << opCtx->getDeadline();
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
if (!waitStatus.isOK()) {
@@ -1744,8 +1742,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
- WaiterInfoGuard waitInfo(
- &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar);
+ ThreadWaiter waiter(opTime, &writeConcern, &condVar);
+ WaiterGuard guard(&_replicationWaiterList, &waiter);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
if (_inShutdown) {
@@ -1763,7 +1761,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
BSONObjBuilder progress;
_appendSlaveInfoData_inlock(&progress);
log() << "Replication for failed WC: " << writeConcern.toBSON()
- << ", waitInfo:" << waitInfo.waiter.toBSON()
+ << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
<< ", progress: " << progress.done();
}
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
@@ -2660,8 +2658,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
result = kActionFollowerModeStateChange;
}
- // Enable replication producer and applier on stepdown.
+ // Exit catchup mode if we're in it and enable replication producer and applier on stepdown.
if (_memberState.primary()) {
+ if (_catchupState) {
+ _catchupState->abort_inlock();
+ }
_applierState = ApplierState::Running;
_externalState->startProducerIfStopped();
}
@@ -2765,13 +2766,18 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
invariant(nextAction != kActionWinElection);
lk.unlock();
_performPostMemberStateUpdateAction(nextAction);
- // Notify all secondaries of the election win.
lk.lock();
- _scheduleElectionWinNotification_inlock();
+ if (!_getMemberState_inlock().primary()) {
+ break;
+ }
+ // Notify all secondaries of the election win.
+ _restartHeartbeats_inlock();
if (isV1ElectionProtocol()) {
- _scanOpTimeForCatchUp_inlock();
+ invariant(!_catchupState);
+ _catchupState = stdx::make_unique<CatchupState>(this);
+ _catchupState->start_inlock();
} else {
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
}
break;
}
@@ -2786,13 +2792,114 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
}
}
+void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
+ log() << "Entering primary catch-up mode.";
+
+ // No catchup in single node replica set.
+ if (_repl->_rsConfig.getNumMembers() == 1) {
+ abort_inlock();
+ return;
+ }
+
+ auto timeoutCB = [this](const CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ log() << "Catchup timed out after becoming primary.";
+ stdx::lock_guard<stdx::mutex> lk(_repl->_mutex);
+ abort_inlock();
+ };
+
+ // Schedule timeout callback.
+ auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod();
+ // Deal with infinity and overflow - no timeout.
+ if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout ||
+ Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) {
+ return;
+ }
+ auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
+ auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
+ if (!status.isOK()) {
+ log() << "Failed to schedule catchup timeout work.";
+ abort_inlock();
+ return;
+ }
+ _timeoutCbh = status.getValue();
+}
+
+void ReplicationCoordinatorImpl::CatchupState::abort_inlock() {
+ invariant(_repl->_getMemberState_inlock().primary());
+
+ log() << "Exited primary catch-up mode.";
+ // Clean up its own members.
+ if (_timeoutCbh) {
+ _repl->_replExecutor->cancel(_timeoutCbh);
+ }
+ if (_waiter) {
+ _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
+ }
+
+ // Enter primary drain mode.
+ _repl->_enterDrainMode_inlock();
+ // Destruct the state itself.
+ _repl->_catchupState.reset(nullptr);
+}
+
+void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() {
+ auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart();
+ // Haven't collected all heartbeat responses.
+ if (!targetOpTime) {
+ return;
+ }
+
+ // We've caught up.
+ if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
+ log() << "Caught up to the latest known optime via heartbeats after becoming primary.";
+ abort_inlock();
+ return;
+ }
+
+ // Reset the target optime if it has changed.
+ if (_waiter && _waiter->opTime == *targetOpTime) {
+ return;
+ }
+
+ log() << "Heartbeats updated catchup target optime to " << *targetOpTime;
+ if (_waiter) {
+ _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
+ }
+ auto targetOpTimeCB = [this, targetOpTime]() {
+ // Double check the target time since stepdown may signal us too.
+ if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
+ log() << "Caught up to the latest known optime successfully after becoming primary.";
+ abort_inlock();
+ }
+ };
+ _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB);
+ _repl->_opTimeWaiterList.add_inlock(_waiter.get());
+}
+
+Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
+ if (!isV1ElectionProtocol()) {
+ return Status(ErrorCodes::CommandNotSupported,
+ "Primary catch-up is only supported by Protocol Version 1");
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_catchupState) {
+ _catchupState->abort_inlock();
+ return Status::OK();
+ }
+ return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
+}
+
void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto scanner = std::make_shared<FreshnessScanner>();
auto scanStartTime = _replExecutor->now();
auto evhStatus = scanner->start(
_replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2801,7 +2908,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2830,7 +2937,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Could not access any nodes within timeout when checking for "
<< "additional ops to apply before finishing transition to primary. "
<< "Will move forward with becoming primary anyway.";
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
@@ -2839,7 +2946,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
log() << "My optime is most up-to-date, skipping catch-up "
<< "and completing transition to primary.";
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
@@ -2852,9 +2959,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Finished catch-up oplog after becoming primary.";
}
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
};
- auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB);
+ auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB);
_opTimeWaiterList.add_inlock(waiterInfo.get());
auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) {
@@ -2867,7 +2974,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
_replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB);
}
-void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() {
+void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_applierState = ApplierState::Draining;
_externalState->stopProducer();
}
@@ -2958,7 +3065,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
}
void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
- _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) {
+ _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) {
return _doneWaitingForReplication_inlock(
waiter->opTime, SnapshotName::min(), *waiter->writeConcern);
});
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index c5f94f32e97..404bf0e16b9 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -333,6 +333,8 @@ public:
virtual Status stepUpIfEligible() override;
+ virtual Status abortCatchupIfNeeded() override;
+
// ================== Test support API ===================
/**
@@ -482,16 +484,55 @@ private:
kActionStartSingleNodeElection
};
- // Struct that holds information about clients waiting for replication.
- struct WaiterInfo;
- struct WaiterInfoGuard;
+ // Abstract struct that holds information about clients waiting for replication.
+ // Subclasses need to define how to notify them.
+ struct Waiter {
+ Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern);
+ virtual ~Waiter() = default;
+
+ BSONObj toBSON() const;
+ std::string toString() const;
+ // It is invalid to call notify_inlock() unless holding ReplicationCoordinatorImpl::_mutex.
+ virtual void notify_inlock() = 0;
+
+ const OpTime opTime;
+ const WriteConcernOptions* writeConcern = nullptr;
+ };
+
+ // When ThreadWaiter gets notified, it will signal the conditional variable.
+ //
+ // This is used when a thread wants to block inline until the opTime is reached with the given
+ // writeConcern.
+ struct ThreadWaiter : public Waiter {
+ ThreadWaiter(OpTime _opTime,
+ const WriteConcernOptions* _writeConcern,
+ stdx::condition_variable* _condVar);
+ void notify_inlock() override;
+
+ stdx::condition_variable* condVar = nullptr;
+ };
+
+ // When the waiter is notified, finishCallback will be called while holding replCoord _mutex
+ // since WaiterLists are protected by _mutex.
+ //
+ // This is used when we want to run a callback when the opTime is reached.
+ struct CallbackWaiter : public Waiter {
+ using FinishFunc = stdx::function<void()>;
+
+ CallbackWaiter(OpTime _opTime, FinishFunc _finishCallback);
+ void notify_inlock() override;
+
+ // The callback that will be called when this waiter is notified.
+ FinishFunc finishCallback = nullptr;
+ };
+
+ class WaiterGuard;
class WaiterList {
public:
- using WaiterType = WaiterInfo*;
+ using WaiterType = Waiter*;
- // Adds waiter into the list. Usually, the waiter will be signaled only once and then
- // removed.
+ // Adds waiter into the list.
void add_inlock(WaiterType waiter);
// Returns whether waiter is found and removed.
bool remove_inlock(WaiterType waiter);
@@ -528,6 +569,44 @@ private:
typedef std::vector<executor::TaskExecutor::CallbackHandle> HeartbeatHandles;
+ // The state and logic of primary catchup.
+ //
+ // When start() is called, CatchupState will schedule the timeout callback. When we get
+ // responses of the latest heartbeats from all nodes, the target time (opTime of _waiter) is
+ // set.
+ // The primary exits catchup mode when any of the following happens.
+ // 1) My last applied optime reaches the target optime, if we've received a heartbeat from all
+ // nodes.
+ // 2) Catchup timeout expires.
+ // 3) Primary steps down.
+ // 4) The primary has to roll back to catch up.
+ // 5) The primary is too stale to catch up.
+ //
+ // On abort, the state resets the pointer to itself in ReplCoordImpl. In other words, the
+ // life cycle of the state object aligns with the conceptual state.
+ // In shutdown, the timeout callback will be canceled by the executor and the state is safe to
+ // destroy.
+ //
+ // Any function of the state must be called while holding _mutex.
+ class CatchupState {
+ public:
+ CatchupState(ReplicationCoordinatorImpl* repl) : _repl(repl) {}
+ // start() can only be called once.
+ void start_inlock();
+ // Reset the state itself to destruct the state.
+ void abort_inlock();
+ // Heartbeat calls this function to update the target optime.
+ void signalHeartbeatUpdate_inlock();
+
+ private:
+ ReplicationCoordinatorImpl* _repl; // Not owned.
+ // Callback handle used to cancel a scheduled catchup timeout callback.
+ ReplicationExecutor::CallbackHandle _timeoutCbh;
+ // Handle to a Waiter that contains the current target optime to reach after which
+ // we can exit catchup mode.
+ std::unique_ptr<CallbackWaiter> _waiter;
+ };
+
/**
* Appends a "replicationProgress" section with data for each member in set.
*/
@@ -1168,7 +1247,7 @@ private:
/**
* Finish catch-up mode and start drain mode.
*/
- void _finishCatchingUpOplog_inlock();
+ void _enterDrainMode_inlock();
/**
* Waits for the config state to leave kConfigStartingUp, which indicates that start() has
@@ -1404,6 +1483,10 @@ private:
mutable stdx::mutex _indexPrefetchMutex;
ReplSettings::IndexPrefetchConfig _indexPrefetchConfig =
ReplSettings::IndexPrefetchConfig::PREFETCH_ALL; // (I)
+
+ // The catchup state including all catchup logic. The presence of a non-null pointer indicates
+ // that the node is currently in catchup mode.
+ std::unique_ptr<CatchupState> _catchupState; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 1075a7e9232..7588fb166d5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -160,7 +160,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- simulateCatchUpTimeout();
+ simulateCatchUpAbort();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
@@ -223,8 +223,6 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- // Wait for catchup check to finish.
- simulateCatchUpTimeout();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
@@ -1280,15 +1278,19 @@ TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase)
class PrimaryCatchUpTest : public ReplCoordTest {
protected:
using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator;
- using FreshnessScanFn = stdx::function<void(const NetworkOpIter)>;
+ using NetworkRequestFn = stdx::function<void(const NetworkOpIter)>;
- void replyToHeartbeatRequestAsSecondaries(const NetworkOpIter noi) {
+ const Timestamp smallTimestamp{1, 1};
+
+ executor::RemoteCommandResponse makeHeartbeatResponse(OpTime opTime) {
ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest();
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
- getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(hbResp.toBSON(true)));
+ hbResp.setAppliedOpTime(opTime);
+ hbResp.setDurableOpTime(opTime);
+ return makeResponseStatus(hbResp.toBSON(true));
}
void simulateSuccessfulV1Voting() {
@@ -1300,10 +1302,9 @@ protected:
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString();
- bool hasReadyRequests = true;
- // Process requests until we're primary and consume the heartbeats for the notification
+ // Process requests until we're primary but leave the heartbeats for the notification
// of election win. Exit immediately on unexpected requests.
- while (!replCoord->getMemberState().primary() || hasReadyRequests) {
+ while (!replCoord->getMemberState().primary()) {
log() << "Waiting on network in state " << replCoord->getMemberState();
net->enterNetwork();
if (net->now() < electionTimeoutWhen) {
@@ -1314,7 +1315,9 @@ protected:
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
- replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
+ OpTime opTime(Timestamp(), getReplCoord()->getTerm());
+ net->scheduleResponse(
+ net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime));
} else if (request.cmdObj.firstElement().fieldNameStringData() ==
"replSetRequestVotes") {
net->scheduleResponse(net->getNextReadyRequest(),
@@ -1336,12 +1339,11 @@ protected:
// executor.
getReplExec()->waitForDBWork_forTest();
net->runReadyNetworkOperations();
- hasReadyRequests = net->hasReadyRequests();
net->exitNetwork();
}
}
- ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime) {
+ ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, bool infiniteTimeout = false) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
@@ -1356,7 +1358,8 @@ protected:
<< "protocolVersion"
<< 1
<< "settings"
- << BSON("catchUpTimeoutMillis" << 5000));
+ << BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis"
+ << (infiniteTimeout ? -1 : 5000)));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
@@ -1378,17 +1381,15 @@ protected:
return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime.toBSON())));
}
- void processFreshnessScanRequests(FreshnessScanFn onFreshnessScanRequest) {
+ void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) {
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
- if (request.cmdObj.firstElement().fieldNameStringData() == "replSetGetStatus") {
- onFreshnessScanRequest(noi);
- } else if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
- replyToHeartbeatRequestAsSecondaries(noi);
+ if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
+ onHeartbeatRequest(noi);
} else {
log() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
@@ -1399,7 +1400,8 @@ protected:
net->exitNetwork();
}
- void replyHeartbeatsAndRunUntil(Date_t until) {
+ // Response heartbeats with opTime until the given time. Exit if it sees any other request.
+ void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) {
auto net = getNet();
net->enterNetwork();
while (net->now() < until) {
@@ -1407,9 +1409,10 @@ protected:
// Peek the next request
auto noi = net->getFrontOfUnscheduledQueue();
auto& request = noi->getRequest();
+ log() << request.target << " at " << net->now() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
// Consume the next request
- replyToHeartbeatRequestAsSecondaries(net->getNextReadyRequest());
+ onHeartbeatRequest(net->getNextReadyRequest());
} else {
// Cannot consume other requests than heartbeats.
net->exitNetwork();
@@ -1420,126 +1423,153 @@ protected:
}
net->exitNetwork();
}
+
+ // Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal
+ // the optime waiter.
+ void advanceMyLastAppliedOpTime(OpTime opTime) {
+ getReplCoord()->setMyLastAppliedOpTime(opTime);
+ getNet()->enterNetwork();
+ getNet()->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+ }
};
-TEST_F(PrimaryCatchUpTest, PrimaryDoNotNeedToCatchUp) {
+// The first round of heartbeats indicates we are the most up-to-date.
+TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- processFreshnessScanRequests([this](const NetworkOpIter noi) {
- getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
+ int count = 0;
+ processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) {
+ count++;
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1));
});
+
+ // Get 2 heartbeats from secondaries.
+ ASSERT_EQUALS(2, count);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("My optime is most up-to-date, skipping catch-up"));
+ ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, PrimaryFreshnessScanTimeout) {
+// Heartbeats set a future target OpTime and we reached that successfully.
+TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- processFreshnessScanRequests([this](const NetworkOpIter noi) {
- auto request = noi->getRequest();
- log() << "Black holing request to " << request.target << ": " << request.cmdObj;
- getNet()->blackHole(noi);
+ processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ // The old primary accepted one more op and all nodes caught up after voting for me.
+ net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
-
- auto net = getNet();
- replyHeartbeatsAndRunUntil(net->now() + config.getCatchUpTimeoutPeriod());
- ASSERT_EQ((int)getReplCoord()->getApplierState(), (int)ApplierState::Draining);
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ advanceMyLastAppliedOpTime(time2);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Could not access any nodes within timeout"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, PrimaryCatchUpSucceeds) {
+TEST_F(PrimaryCatchUpTest, CatchupTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- // The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
+ replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) {
+ // Other nodes are ahead of me.
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
});
-
- NetworkInterfaceMock* net = getNet();
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- // Simulate the work done by bgsync and applier threads.
- // setMyLastAppliedOpTime() will signal the optime waiter.
- getReplCoord()->setMyLastAppliedOpTime(time2);
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
+ ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, PrimaryCatchUpTimeout) {
+TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
- OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- // The new primary learns of the latest OpTime.
- processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ // We should get caught up by the timeout time.
+ auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
+ replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
+ const RemoteCommandRequest& request = noi->getRequest();
+ if (request.target.host() == "node2") {
+ auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
+ getNet()->scheduleResponse(noi, getNet()->now(), status);
+ } else {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
+ }
});
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ stopCapturingLogMessages();
+ ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
+ auto opCtx = makeOperationContext();
+ getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
+}
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
+TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+ // We should get caught up by the timeout time.
+ auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
+ replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
+ const RemoteCommandRequest& request = noi->getRequest();
+ if (request.target.host() == "node2") {
+ log() << "Black holing heartbeat from " << request.target.host();
+ getNet()->blackHole(noi);
+ } else {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
+ }
+ });
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
+ ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime via heartbeats"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
-TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
+TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
- auto request = noi->getRequest();
- log() << "Black holing request to " << request.target << ": " << request.cmdObj;
- getNet()->blackHole(noi);
- });
+ // Step down immediately.
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
@@ -1551,30 +1581,25 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
-
- processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
- auto net = getNet();
- // The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ // Step down in the middle of catchup.
+ auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2;
+ replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) {
+ // Other nodes are ahead of me.
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
-
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- auto net = getNet();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
- auto opCtx = makeOperationContext();
- // Simulate the applier signaling replCoord to exit drain mode.
- // At this point, we see the stepdown and reset the states.
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
+ ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
+ auto opCtx = makeOperationContext();
Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -1586,24 +1611,17 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
- processFreshnessScanRequests([this, time2](const NetworkOpIter noi) {
+ processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
auto net = getNet();
// The old primary accepted one more op and all nodes caught up after voting for me.
- net->scheduleResponse(noi, net->now(), makeFreshnessScanResponse(time2));
+ net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
-
- NetworkInterfaceMock* net = getNet();
ReplicationCoordinatorImpl* replCoord = getReplCoord();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
- // Simulate the work done by bgsync and applier threads.
- // setMyLastAppliedOpTime() will signal the optime waiter.
- replCoord->setMyLastAppliedOpTime(time2);
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
+ advanceMyLastAppliedOpTime(time2);
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("Finished catch-up oplog after becoming primary."));
+ ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime"));
// Step down during drain mode.
TopologyCoordinator::UpdateTermResult updateTermResult;
@@ -1617,9 +1635,10 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
simulateSuccessfulV1Voting();
ASSERT_TRUE(replCoord->getMemberState().primary());
- // No need to catch-up, so we enter drain mode.
- processFreshnessScanRequests([this](const NetworkOpIter noi) {
- getNet()->scheduleResponse(noi, getNet()->now(), makeFreshnessScanResponse(OpTime()));
+ // No need to catch up, so we enter drain mode.
+ processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
+ auto net = getNet();
+ net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
auto opCtx = makeOperationContext();
@@ -1633,6 +1652,113 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
+TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
+ OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(200, 1), 0);
+ OpTime time3(Timestamp(300, 1), 0);
+ OpTime time4(Timestamp(400, 1), 0);
+
+ // 1) The primary is at time 1 at the beginning.
+ ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
+
+ // 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available.
+ // So the target optime is time 3.
+ startCapturingLogMessages();
+ auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3;
+ replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) {
+ const RemoteCommandRequest& request = noi->getRequest();
+ if (request.target.host() == "node2") {
+ auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
+ getNet()->scheduleResponse(noi, getNet()->now(), status);
+ } else {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
+ }
+ });
+ // The node is still in catchup mode, but the target optime has been set.
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ stopCapturingLogMessages();
+ ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
+
+ // 3) Advancing its applied optime to time 2 isn't enough.
+ advanceMyLastAppliedOpTime(time2);
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
+ // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target.
+ startCapturingLogMessages();
+ auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3;
+ replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) {
+ const RemoteCommandRequest& request = noi->getRequest();
+ if (request.target.host() == "node2") {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4));
+ } else {
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
+ }
+ });
+ // The node is still in catchup mode, but the target optime has been updated.
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ stopCapturingLogMessages();
+ ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
+
+ // 5) Advancing to time 3 isn't enough now.
+ advanceMyLastAppliedOpTime(time3);
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
+ // 6) The node catches up time 4 eventually.
+ startCapturingLogMessages();
+ advanceMyLastAppliedOpTime(time4);
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ stopCapturingLogMessages();
+ ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest known optime"));
+ auto opCtx = makeOperationContext();
+ getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
+}
+
+TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) {
+ startCapturingLogMessages();
+
+ OpTime time1(Timestamp(100, 1), 0);
+ OpTime time2(Timestamp(100, 2), 0);
+ ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, true);
+
+ // Run time far forward and ensure we are still in catchup mode.
+ // This is an arbitrary time 'far' into the future.
+ auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10;
+ replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) {
+ // Other nodes are ahead of me.
+ getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
+
+ // Simulate the heartbeats from secondaries to primary to update liveness info.
+ // TODO(sz): Remove this after merging liveness info and heartbeats.
+ const RemoteCommandRequest& request = noi->getRequest();
+ ReplSetHeartbeatArgsV1 hbArgs;
+ hbArgs.setConfigVersion(config.getConfigVersion());
+ hbArgs.setSetName(config.getReplSetName());
+ hbArgs.setSenderHost(request.target);
+ hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId());
+ hbArgs.setTerm(getReplCoord()->getTerm());
+ ASSERT(hbArgs.isInitialized());
+ ReplSetHeartbeatResponse response;
+ ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
+ });
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+
+ // Simulate a user initiated abort.
+ ASSERT_OK(getReplCoord()->abortCatchupIfNeeded());
+ ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest known optime"));
+ ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
+ auto opCtx = makeOperationContext();
+ getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ Lock::GlobalLock lock(opCtx.get(), MODE_IX, 1);
+ ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 0495e843590..68e1a61c4ac 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -234,6 +234,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
// Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
_signalStepDownWaiter_inlock();
+ // Abort catchup if we have caught up to the latest known optime after heartbeat refreshing.
+ if (_catchupState) {
+ _catchupState->signalHeartbeatUpdate_inlock();
+ }
+
_scheduleHeartbeatToTarget_inlock(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
@@ -665,6 +670,9 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock() {
}
_scheduleHeartbeatToTarget_inlock(_rsConfig.getMemberAt(i).getHostAndPort(), i, now);
}
+
+ _topCoord->restartHeartbeats();
+
if (isV1ElectionProtocol()) {
for (auto&& slaveInfo : _slaveInfo) {
slaveInfo.lastUpdate = _replExecutor->now();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index e76c37898a3..1f3b0881d5c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -107,11 +107,6 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext opCtx,
replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
ASSERT(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
replCoord->waitForElectionFinish_forTest();
- // Wait for primary catch-up
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
-
ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
@@ -5057,7 +5052,6 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
// Single node cluster - this node should start election on setFollowerMode() completion.
replCoord->waitForElectionFinish_forTest();
- simulateCatchUpTimeout();
// Successful dry run election increases term.
ASSERT_EQUALS(initialTerm + 1, replCoord->getTerm());
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 27d62c0af1e..18397d23a23 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -497,5 +497,9 @@ void ReplicationCoordinatorMock::setMaster(bool isMaster) {
_settings.setMaster(isMaster);
}
+Status ReplicationCoordinatorMock::abortCatchupIfNeeded() {
+ return Status::OK();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index fdfa491f0ad..72bc120f226 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -284,6 +284,8 @@ public:
return _service;
}
+ virtual Status abortCatchupIfNeeded() override;
+
private:
AtomicUInt64 _snapshotNameGenerator;
ServiceContext* const _service;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index e5a17dd5c1a..aaa00fcdae4 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -323,6 +323,10 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
+ // The smallest valid optime in PV1.
+ OpTime opTime(Timestamp(), 0);
+ hbResp.setAppliedOpTime(opTime);
+ hbResp.setDurableOpTime(opTime);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true)));
} else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") {
@@ -488,32 +492,30 @@ void ReplCoordTest::disableSnapshots() {
_externalState->setAreSnapshotsEnabled(false);
}
-void ReplCoordTest::simulateCatchUpTimeout() {
+void ReplCoordTest::simulateCatchUpAbort() {
NetworkInterfaceMock* net = getNet();
- auto catchUpTimeoutWhen = net->now() + getReplCoord()->getConfig().getCatchUpTimeoutPeriod();
+ auto heartbeatTimeoutWhen =
+ net->now() + getReplCoord()->getConfig().getHeartbeatTimeoutPeriodMillis();
bool hasRequest = false;
net->enterNetwork();
- if (net->now() < catchUpTimeoutWhen) {
- net->runUntil(catchUpTimeoutWhen);
+ if (net->now() < heartbeatTimeoutWhen) {
+ net->runUntil(heartbeatTimeoutWhen);
}
hasRequest = net->hasReadyRequests();
- net->exitNetwork();
-
while (hasRequest) {
- net->enterNetwork();
auto noi = net->getNextReadyRequest();
auto request = noi->getRequest();
// Black hole heartbeat requests caused by time advance.
log() << "Black holing request to " << request.target.toString() << " : " << request.cmdObj;
net->blackHole(noi);
- if (net->now() < catchUpTimeoutWhen) {
- net->runUntil(catchUpTimeoutWhen);
+ if (net->now() < heartbeatTimeoutWhen) {
+ net->runUntil(heartbeatTimeoutWhen);
} else {
net->runReadyNetworkOperations();
}
hasRequest = net->hasReadyRequests();
- net->exitNetwork();
}
+ net->exitNetwork();
}
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index 972e2f503ae..ab2653be11d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -253,9 +253,9 @@ protected:
void disableSnapshots();
/**
- * Timeout all freshness scan request for primary catch-up.
+ * Timeout all heartbeat requests for primary catch-up.
*/
- void simulateCatchUpTimeout();
+ void simulateCatchUpAbort();
private:
std::unique_ptr<ReplicationCoordinatorImpl> _repl;
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 2a2fc1259fb..37aad7459fa 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -499,6 +499,21 @@ public:
*/
virtual void setStorageEngineSupportsReadCommitted(bool supported) = 0;
+ /**
+ * Reset the booleans to record the last heartbeat restart.
+ */
+ virtual void restartHeartbeats() = 0;
+
+ /**
+ * Scans through all members that are 'up' and return the latest known optime, if we have
+ * received (successful or failed) heartbeats from all nodes since heartbeat restart.
+ *
+ * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted
+ * heartbeats.
+ * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down.
+ */
+ virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const = 0;
+
protected:
TopologyCoordinator() {}
};
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 1c1f53f89a1..4211dee4860 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2648,5 +2648,35 @@ void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool support
supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
}
+void TopologyCoordinatorImpl::restartHeartbeats() {
+ for (auto& hb : _hbdata) {
+ hb.restart();
+ }
+}
+
+boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const {
+ // The smallest OpTime in PV1.
+ OpTime latest(Timestamp(0, 0), 0);
+ for (size_t i = 0; i < _hbdata.size(); i++) {
+ auto& peer = _hbdata[i];
+
+ if (static_cast<int>(i) == _selfIndex) {
+ continue;
+ }
+ // If any heartbeat is not fresh enough, return none.
+ if (!peer.isUpdatedSinceRestart()) {
+ return boost::none;
+ }
+ // Ignore down members
+ if (!peer.up()) {
+ continue;
+ }
+ if (peer.getAppliedOpTime() > latest) {
+ latest = peer.getAppliedOpTime();
+ }
+ }
+ return latest;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index a52758ab8ed..0c268b67347 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -246,6 +246,10 @@ public:
bool isPriorityTakeover);
virtual void setStorageEngineSupportsReadCommitted(bool supported);
+ virtual void restartHeartbeats();
+
+ virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
+
////////////////////////////////////////////////////////////
//
// Test support methods
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index fa3834dd03a..e6865a2e649 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -681,7 +681,8 @@ var ReplSetTest = function(opts) {
return config;
}
- if (_isRunningWithoutJournaling(replNode)) {
+ // Check journaling by sending commands through the bridge if it's used.
+ if (_isRunningWithoutJournaling(this.nodes[0])) {
config[wcMajorityJournalField] = false;
}
@@ -873,9 +874,11 @@ var ReplSetTest = function(opts) {
};
this.reInitiate = function() {
- var config = this.getReplSetConfig();
- var newVersion = this.getReplSetConfigFromNode().version + 1;
- config.version = newVersion;
+ var config = this.getReplSetConfigFromNode();
+ var newConfig = this.getReplSetConfig();
+ // Only reset members.
+ config.members = newConfig.members;
+ config.version += 1;
this._setDefaultConfigOptions(config);