summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2018-04-24 13:43:33 -0400
committerSuganthi Mani <suganthi.mani@mongodb.com>2018-04-27 18:08:46 -0400
commit649630d10bf6c00225480fe261c042737e5d9688 (patch)
tree99cac306a382a20fe71ff81a670ec215da73d993 /src/mongo/db/repl
parent04d40938faf18bc9158e783b28f6766881fd15f5 (diff)
downloadmongo-649630d10bf6c00225480fe261c042737e5d9688.tar.gz
SERVER-33546 Adding a new field syncSourceId to replSetGetStatus cmd.
SERVER-5461 Adding a new field syncSourceHost to replSetGetStatus cmd.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp17
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp134
2 files changed, 146 insertions, 5 deletions
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 777e2f6ce9f..fe52da99b42 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -1967,6 +1967,8 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
}
response->append("lastHeartbeatMessage", "");
response->append("syncingTo", "");
+ response->append("syncSourceHost", "");
+ response->append("syncSourceId", -1);
response->append("infoMessage", _getHbmsg(now));
*result = Status(ErrorCodes::InvalidReplicaSetConfig,
@@ -1994,8 +1996,13 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
if (!_syncSource.empty() && !_iAmPrimary()) {
bb.append("syncingTo", _syncSource.toString());
+ bb.append("syncSourceHost", _syncSource.toString());
+ const MemberConfig* member = _rsConfig.findMemberByHostAndPort(_syncSource);
+ bb.append("syncSourceId", member ? member->getId() : -1);
} else {
bb.append("syncingTo", "");
+ bb.append("syncSourceHost", "");
+ bb.append("syncSourceId", -1);
}
if (_maintenanceModeCalls) {
@@ -2060,8 +2067,13 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
const HostAndPort& syncSource = it->getSyncSource();
if (!syncSource.empty() && !state.primary()) {
bb.append("syncingTo", syncSource.toString());
+ bb.append("syncSourceHost", syncSource.toString());
+ const MemberConfig* member = _rsConfig.findMemberByHostAndPort(syncSource);
+ bb.append("syncSourceId", member ? member->getId() : -1);
} else {
bb.append("syncingTo", "");
+ bb.append("syncSourceHost", "");
+ bb.append("syncSourceId", -1);
}
bb.append("infoMessage", "");
@@ -2088,8 +2100,13 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu
// Add sync source info
if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
response->append("syncingTo", _syncSource.toString());
+ response->append("syncSourceHost", _syncSource.toString());
+ const MemberConfig* member = _rsConfig.findMemberByHostAndPort(_syncSource);
+ response->append("syncSourceId", member ? member->getId() : -1);
} else {
response->append("syncingTo", "");
+ response->append("syncSourceHost", "");
+ response->append("syncSourceId", -1);
}
if (_rsConfig.isConfigServer()) {
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index fb9b4d2cb47..14d0bfef328 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -192,14 +192,16 @@ protected:
const std::string& setName,
MemberState memberState,
const OpTime& electionTime,
- const OpTime& lastOpTimeSender) {
+ const OpTime& lastOpTimeSender,
+ const HostAndPort& syncingTo = HostAndPort()) {
return _receiveHeartbeatHelper(Status::OK(),
member,
setName,
memberState,
electionTime.getTimestamp(),
lastOpTimeSender,
- Milliseconds(1));
+ Milliseconds(1),
+ syncingTo);
}
HeartbeatResponseAction receiveDownHeartbeat(
@@ -215,7 +217,8 @@ protected:
MemberState::RS_UNKNOWN,
Timestamp(),
OpTime(),
- roundTripTime);
+ roundTripTime,
+ HostAndPort());
}
HeartbeatResponseAction heartbeatFromMember(const HostAndPort& member,
@@ -229,7 +232,8 @@ protected:
memberState,
Timestamp(),
lastOpTimeSender,
- roundTripTime);
+ roundTripTime,
+ HostAndPort());
}
private:
@@ -239,7 +243,8 @@ private:
MemberState memberState,
Timestamp electionTime,
const OpTime& lastOpTimeSender,
- Milliseconds roundTripTime) {
+ Milliseconds roundTripTime,
+ const HostAndPort& syncingTo) {
ReplSetHeartbeatResponse hb;
hb.setConfigVersion(1);
hb.setState(memberState);
@@ -247,6 +252,7 @@ private:
hb.setAppliedOpTime(lastOpTimeSender);
hb.setElectionTime(electionTime);
hb.setTerm(getTopoCoord().getTerm());
+ hb.setSyncingTo(syncingTo);
StatusWith<ReplSetHeartbeatResponse> hbResponse = responseStatus.isOK()
? StatusWith<ReplSetHeartbeatResponse>(hb)
@@ -4460,10 +4466,16 @@ TEST_F(TopoCoordTest,
// These fields should all be empty, since this node has not received heartbeats and has
// no sync source yet.
ASSERT_EQUALS("", rsStatus["syncingTo"].String());
+ ASSERT_EQUALS("", rsStatus["syncSourceHost"].String());
+ ASSERT_EQUALS(-1, rsStatus["syncSourceId"].numberInt());
ASSERT_EQUALS("", member0Status["syncingTo"].String());
+ ASSERT_EQUALS("", member0Status["syncSourceHost"].String());
+ ASSERT_EQUALS(-1, member0Status["syncSourceId"].numberInt());
ASSERT_EQUALS("", member0Status["lastHeartbeatMessage"].String());
ASSERT_EQUALS("", member0Status["infoMessage"].String());
ASSERT_EQUALS("", member1Status["syncingTo"].String());
+ ASSERT_EQUALS("", member1Status["syncSourceHost"].String());
+ ASSERT_EQUALS(-1, member1Status["syncSourceId"].numberInt());
ASSERT_EQUALS("", member1Status["lastHeartbeatMessage"].String());
ASSERT_EQUALS("", member1Status["infoMessage"].String());
}
@@ -4528,15 +4540,125 @@ TEST_F(TopoCoordTest,
// Node 0 (self) has received heartbeats and has a sync source.
ASSERT_EQUALS("host1:27017", rsStatus["syncingTo"].String());
+ ASSERT_EQUALS("host1:27017", rsStatus["syncSourceHost"].String());
+ ASSERT_EQUALS(1, rsStatus["syncSourceId"].numberInt());
ASSERT_EQUALS("host1:27017", member0Status["syncingTo"].String());
+ ASSERT_EQUALS("host1:27017", member0Status["syncSourceHost"].String());
+ ASSERT_EQUALS(1, member0Status["syncSourceId"].numberInt());
ASSERT_EQUALS("syncing from: host1:27017", member0Status["infoMessage"].String());
ASSERT_EQUALS("", member0Status["lastHeartbeatMessage"].String());
ASSERT_EQUALS("", member1Status["syncingTo"].String());
+ ASSERT_EQUALS("", member1Status["syncSourceHost"].String());
+ ASSERT_EQUALS(-1, member1Status["syncSourceId"].numberInt());
ASSERT_EQUALS("", member1Status["infoMessage"].String());
ASSERT_EQUALS("", member1Status["lastHeartbeatMessage"].String());
}
}
+TEST_F(TopoCoordTest, replSetGetStatusForThreeMemberedReplicaSet) {
+
+ Date_t heartbeatTime = Date_t::fromMillisSinceEpoch(5000);
+ Seconds uptimeSecs(10);
+ Date_t curTime = heartbeatTime + uptimeSecs;
+ OpTime oplogProgress(Timestamp(3, 4), 0);
+
+ ASSERT_TRUE(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole());
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getTopoCoord().getMemberState().s);
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version"
+ << 5
+ << "settings"
+ << BSON("chainingAllowed" << false)
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 30 << "host"
+ << "hself:27017")
+ << BSON("_id" << 20 << "host"
+ << "hprimary:27017")
+ << BSON("_id" << 10 << "host"
+ << "h1:27017"))
+ << "protocolVersion"
+ << 1),
+ 0);
+
+ ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+
+ // Receive heartbeats and choose a sync source.
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ OpTime election = OpTime();
+
+ // Record two rounds of pings so the node can pick a sync source.
+ receiveUpHeartbeat(
+ HostAndPort("hprimary"), "rs0", MemberState::RS_PRIMARY, election, oplogProgress);
+ receiveUpHeartbeat(
+ HostAndPort("hprimary"), "rs0", MemberState::RS_PRIMARY, election, oplogProgress);
+
+ // Mimic that h1 sends a heartbeat response with hprimary as syncsource.
+ receiveUpHeartbeat(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ oplogProgress,
+ HostAndPort("hprimary"));
+ receiveUpHeartbeat(HostAndPort("h1"),
+ "rs0",
+ MemberState::RS_SECONDARY,
+ election,
+ oplogProgress,
+ HostAndPort("hprimary"));
+
+ // Since chainingAllowed is disabled, hself should choose hprimary.
+ getTopoCoord().chooseNewSyncSource(
+ now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration);
+ ASSERT_EQUALS(HostAndPort("hprimary"), getTopoCoord().getSyncSourceAddress());
+
+ BSONObjBuilder statusBuilder;
+ Status resultStatus(ErrorCodes::InternalError, "prepareStatusResponse didn't set result");
+ getTopoCoord().prepareStatusResponse(
+ TopologyCoordinator::ReplSetStatusArgs{
+ curTime,
+ static_cast<unsigned>(durationCount<Seconds>(uptimeSecs)),
+ OpTime(),
+ BSONObj()},
+ &statusBuilder,
+ &resultStatus);
+
+ ASSERT_OK(resultStatus);
+ BSONObj rsStatus = statusBuilder.obj();
+ BSONObj member0Status = rsStatus["members"].Array()[0].Obj();
+ BSONObj member1Status = rsStatus["members"].Array()[1].Obj();
+ BSONObj member2Status = rsStatus["members"].Array()[2].Obj();
+
+ ASSERT_EQUALS("hprimary:27017", rsStatus["syncingTo"].String());
+ ASSERT_EQUALS("hprimary:27017", rsStatus["syncSourceHost"].String());
+ ASSERT_EQUALS(20, rsStatus["syncSourceId"].numberInt());
+
+ // h1
+ ASSERT_EQUALS(10, member0Status["_id"].numberInt());
+ ASSERT_EQUALS("hprimary:27017", member0Status["syncingTo"].String());
+ ASSERT_EQUALS("hprimary:27017", member0Status["syncSourceHost"].String());
+ ASSERT_EQUALS(20, member0Status["syncSourceId"].numberInt());
+ ASSERT_EQUALS("", member0Status["infoMessage"].String());
+ ASSERT_EQUALS("", member0Status["lastHeartbeatMessage"].String());
+
+ // hprimary
+ ASSERT_EQUALS(20, member1Status["_id"].numberInt());
+ ASSERT_EQUALS("", member1Status["syncingTo"].String());
+ ASSERT_EQUALS("", member1Status["syncSourceHost"].String());
+ ASSERT_EQUALS(-1, member1Status["syncSourceId"].numberInt());
+ ASSERT_EQUALS("", member1Status["infoMessage"].String());
+ ASSERT_EQUALS("", member1Status["lastHeartbeatMessage"].String());
+
+ // hself
+ ASSERT_EQUALS(30, member2Status["_id"].numberInt());
+ ASSERT_EQUALS("hprimary:27017", member2Status["syncingTo"].String());
+ ASSERT_EQUALS("hprimary:27017", member2Status["syncSourceHost"].String());
+ ASSERT_EQUALS(20, member2Status["syncSourceId"].numberInt());
+ ASSERT_EQUALS("syncing from primary: hprimary:27017", member2Status["infoMessage"].String());
+ ASSERT_EQUALS("", member2Status["lastHeartbeatMessage"].String());
+}
+
TEST_F(TopoCoordTest, StatusResponseAlwaysIncludesStringStatusFieldsForNonMembers) {
Date_t heartbeatTime = Date_t::fromMillisSinceEpoch(5000);
Seconds uptimeSecs(10);
@@ -4575,6 +4697,8 @@ TEST_F(TopoCoordTest, StatusResponseAlwaysIncludesStringStatusFieldsForNonMember
// These fields should all be empty, since this node is not a member of a replica set.
ASSERT_EQUALS("", rsStatus["lastHeartbeatMessage"].String());
ASSERT_EQUALS("", rsStatus["syncingTo"].String());
+ ASSERT_EQUALS("", rsStatus["syncSourceHost"].String());
+ ASSERT_EQUALS(-1, rsStatus["syncSourceId"].numberInt());
ASSERT_EQUALS("", rsStatus["infoMessage"].String());
}