summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2014-08-19 18:26:21 -0400
committerSpencer T Brody <spencer@mongodb.com>2014-08-20 17:09:24 -0400
commitf6490541d0bb400aec87064ead946dfc52309274 (patch)
tree28512901d724b6d7b2d8c2bd5316e66cf8837363 /src
parentf9a0f6380d1858bb8c8feeb008276c680d68d7ba (diff)
downloadmongo-f6490541d0bb400aec87064ead946dfc52309274.tar.gz
SERVER-14454 Implement processReplSetFresh in ReplicationCoordinatorImpl
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp18
-rw-r--r--src/mongo/db/repl/topology_coordinator.h17
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp98
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h28
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp166
5 files changed, 251 insertions, 76 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 0b9952d501e..5026d8769d1 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -843,8 +843,22 @@ namespace repl {
Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) {
- // TODO
- return Status::OK();
+
+ Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&TopologyCoordinator::prepareFreshResponse,
+ _topCoord.get(),
+ stdx::placeholders::_1,
+ args,
+ _getLastOpApplied_inlock(),
+ resultObj,
+ &result));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
+ }
+ fassert(18652, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ return result;
}
Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args,
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 616052fa817..a844fe575a2 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -48,6 +48,7 @@ namespace repl {
class MemberHeartbeatData;
struct MemberState;
class ReplicaSetConfig;
+ class ReplSetHeartbeatArgs;
class TagSubgroup;
/**
@@ -63,10 +64,6 @@ namespace repl {
virtual ~TopologyCoordinator() {}
- // The optime of the last op marked as committed by the leader
- virtual void setCommitOkayThrough(const OpTime& optime) = 0;
- // The optime of the last op received over the network from the sync source
- virtual void setLastReceived(const OpTime& optime) = 0;
// The index into the config used when we next choose a sync source
virtual void setForceSyncSourceIndex(int index) = 0;
@@ -98,12 +95,12 @@ namespace repl {
BSONObjBuilder* response,
Status* result) = 0;
- // produce a reply to a RAFT-style RequestVote RPC
- virtual void prepareRequestVoteResponse(const Date_t now,
- const BSONObj& cmdObj,
- const OpTime& lastOpApplied,
- std::string& errmsg,
- BSONObjBuilder& result) = 0;
+ // produce a reply to a replSetFresh command
+ virtual void prepareFreshResponse(const ReplicationExecutor::CallbackData& data,
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result) = 0;
// produce a reply to a received electCmd
virtual void prepareElectCmdResponse(const Date_t now,
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 6e7afdcd7ce..1503ffa80b2 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -70,14 +70,6 @@ namespace repl {
{
}
- void TopologyCoordinatorImpl::setCommitOkayThrough(const OpTime& optime) {
- _commitOkayThrough = optime;
- }
-
- void TopologyCoordinatorImpl::setLastReceived(const OpTime& optime) {
- _lastReceived = optime;
- }
-
void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) {
invariant(_forceSyncSourceIndex < _currentConfig.getNumMembers());
_forceSyncSourceIndex = index;
@@ -329,72 +321,72 @@ namespace repl {
*result = Status::OK();
}
- // Produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command
- // The caller should validate that the message is for the correct set, and has the required data
- void TopologyCoordinatorImpl::prepareRequestVoteResponse(const Date_t now,
- const BSONObj& cmdObj,
- const OpTime& lastOpApplied,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ void TopologyCoordinatorImpl::prepareFreshResponse(
+ const ReplicationExecutor::CallbackData& data,
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result) {
- string who = cmdObj["who"].String();
- int cfgver = cmdObj["cfgver"].Int();
- OpTime opTime(cmdObj["opTime"].Date());
+ if (args.setName != _currentConfig.getReplSetName()) {
+ *result = Status(ErrorCodes::ReplicaSetNotFound,
+ str::stream() << "Wrong repl set name. Expected: " <<
+ _currentConfig.getReplSetName() <<
+ ", received: " << args.setName);
+ return;
+ }
bool weAreFresher = false;
- if( _currentConfig.getConfigVersion() > cfgver ) {
- log() << "replSet member " << who << " is not yet aware its cfg version "
- << cfgver << " is stale";
- result.append("info", "config version stale");
+ if( _currentConfig.getConfigVersion() > args.cfgver ) {
+ log() << "replSet member " << args.who << " is not yet aware its cfg version "
+ << args.cfgver << " is stale";
+ response->append("info", "config version stale");
weAreFresher = true;
}
// check not only our own optime, but any other member we can reach
- else if( opTime < _commitOkayThrough ||
- opTime < _latestKnownOpTime()) {
+ else if( args.opTime < lastOpApplied ||
+ args.opTime < _latestKnownOpTime()) {
weAreFresher = true;
}
- result.appendDate("opTime", lastOpApplied.asDate());
- result.append("fresher", weAreFresher);
+ response->appendDate("opTime", lastOpApplied.asDate());
+ response->append("fresher", weAreFresher);
- bool doVeto = _shouldVeto(cmdObj, errmsg);
- result.append("veto",doVeto);
+ std::string errmsg;
+ bool doVeto = _shouldVetoMember(args.id, lastOpApplied, &errmsg);
+ response->append("veto", doVeto);
if (doVeto) {
- result.append("errmsg", errmsg);
+ response->append("errmsg", errmsg);
}
+ *result = Status::OK();
}
- bool TopologyCoordinatorImpl::_shouldVeto(const BSONObj& cmdObj, string& errmsg) const {
- // don't veto older versions
- if (cmdObj["id"].eoo()) {
- // they won't be looking for the veto field
- return false;
- }
-
- const int id = cmdObj["id"].Int();
- const int hopefulIndex = _getMemberIndex(id);
+ bool TopologyCoordinatorImpl::_shouldVetoMember(unsigned int memberID,
+ const OpTime& lastOpApplied,
+ std::string* errmsg) const {
+ const int hopefulIndex = _getMemberIndex(memberID);
const int highestPriorityIndex = _getHighestPriorityElectableIndex();
if (hopefulIndex == -1) {
- errmsg = str::stream() << "replSet couldn't find member with id " << id;
+ *errmsg = str::stream() << "replSet couldn't find member with id " << memberID;
return true;
}
- if ((_currentPrimaryIndex != -1) &&
- (_commitOkayThrough >= _hbdata[hopefulIndex].getOpTime())) {
- // hbinfo is not updated, so we have to check the primary's last optime separately
- errmsg = str::stream() << "I am already primary, " <<
+ if ((_currentPrimaryIndex == _selfIndex) &&
+ (lastOpApplied >= _hbdata[hopefulIndex].getOpTime())) {
+ // hbinfo is not updated for ourself, so if we are primary we have to check the
+ // primary's last optime separately
+ *errmsg = str::stream() << "I am already primary, " <<
_currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() <<
" can try again once I've stepped down";
return true;
}
if (_currentPrimaryIndex != -1 &&
- (_currentConfig.getMemberAt(hopefulIndex).getId() !=
- _currentConfig.getMemberAt(_currentPrimaryIndex).getId()) &&
- (_hbdata[_currentPrimaryIndex].getOpTime() >=
- _hbdata[hopefulIndex].getOpTime())) {
+ (hopefulIndex != _currentPrimaryIndex) &&
+ (_hbdata[_currentPrimaryIndex].getOpTime() >=
+ _hbdata[hopefulIndex].getOpTime())) {
// other members might be aware of more up-to-date nodes
- errmsg = str::stream() <<
+ *errmsg = str::stream() <<
_currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() <<
" is trying to elect itself but " <<
_currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() <<
@@ -405,15 +397,15 @@ namespace repl {
if ((highestPriorityIndex != -1) &&
_currentConfig.getMemberAt(highestPriorityIndex).getPriority() >
_currentConfig.getMemberAt(hopefulIndex).getPriority()) {
- errmsg = str::stream() <<
+ *errmsg = str::stream() <<
_currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() <<
" has lower priority than " <<
_currentConfig.getMemberAt(highestPriorityIndex).getHostAndPort().toString();
return true;
}
- if (!_electableSet.count(id)) {
- errmsg = str::stream() << "I don't think "
+ if (!_electableSet.count(memberID)) {
+ *errmsg = str::stream() << "I don't think "
<< _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() <<
" is electable";
return true;
@@ -969,6 +961,10 @@ namespace repl {
}
}
+ void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
+ _currentPrimaryIndex = primaryIndex;
+ }
+
void TopologyCoordinatorImpl::prepareStatusResponse(
const ReplicationExecutor::CallbackData& data,
Date_t now,
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 0b9f130f8db..0a08e08ca69 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -72,8 +72,6 @@ namespace repl {
public:
explicit TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs);
- virtual void setCommitOkayThrough(const OpTime& optime);
- virtual void setLastReceived(const OpTime& optime);
// TODO(spencer): Can this be made private?
virtual void setForceSyncSourceIndex(int index);
@@ -102,12 +100,11 @@ namespace repl {
BSONObjBuilder* response,
Status* result);
- // produces a reply to a RAFT-style RequestVote RPC
- virtual void prepareRequestVoteResponse(const Date_t now,
- const BSONObj& cmdObj,
- const OpTime& lastOpApplied,
- std::string& errmsg,
- BSONObjBuilder& result);
+ virtual void prepareFreshResponse(const ReplicationExecutor::CallbackData& data,
+ const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const OpTime& lastOpApplied,
+ BSONObjBuilder* response,
+ Status* result);
// produces a reply to a received electCmd
virtual void prepareElectCmdResponse(const Date_t now,
@@ -164,6 +161,11 @@ namespace repl {
// call this method from outside of TopologyCoordinatorImpl or a unit test.
void _changeMemberState(const MemberState& newMemberState);
+ // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests!
+ // TODO(spencer): Remove this once we can easily call for an election in unit tests to
+ // set the current primary.
+ void _setCurrentPrimaryForTest(int primaryIndex);
+
private:
// Returns the number of heartbeat pings which have occurred.
@@ -172,9 +174,12 @@ namespace repl {
// Returns the current "ping" value for the given member by their address
virtual int _getPing(const HostAndPort& host);
- // Determines if we will veto the member in the "fresh" command response
+ // Determines if we will veto the member specified by "memberID", given that the last op
+ // we have applied locally is "lastOpApplied".
// If we veto, the errmsg will be filled in with a reason
- bool _shouldVeto(const BSONObj& cmdObj, string& errmsg) const;
+ bool _shouldVetoMember(unsigned int memberID,
+ const OpTime& lastOpApplied,
+ std::string* errmsg) const;
// Returns the index of the member with the matching id, or -1 if none match.
int _getMemberIndex(int id) const;
@@ -197,9 +202,6 @@ namespace repl {
// Scans the electable set and returns the highest priority member index
int _getHighestPriorityElectableIndex() const;
- OpTime _commitOkayThrough; // the primary's latest op that won't get rolled back
- OpTime _lastReceived; // the last op we have received from our sync source
-
// Our current state (PRIMARY, SECONDARY, etc)
MemberState _memberState;
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index b20bd59b595..131807f9986 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -583,6 +583,172 @@ namespace {
// TODO(spencer): Test electionTime and pingMs are set properly
}
+ TEST(TopologyCoordinator, PrepareRequestVoteResponse) {
+ ReplicationExecutor::CallbackHandle cbh;
+ ReplicationExecutor::CallbackData cbData(NULL,
+ cbh,
+ Status::OK());
+ ReplicaSetConfig config;
+
+ ASSERT_OK(config.initialize(BSON("_id" << "rs0" <<
+ "version" << 10 <<
+ "members" << BSON_ARRAY(
+ BSON("_id" << 10 <<
+ "host" << "hself" <<
+ "priority" << 10) <<
+ BSON("_id" << 20 << "host" << "h1") <<
+ BSON("_id" << 30 << "host" << "h2") <<
+ BSON("_id" << 40 <<
+ "host" << "h3" <<
+ "priority" << 10)))));
+ ASSERT_OK(config.validate());
+
+ TopologyCoordinatorImpl topocoord((Seconds(999)));
+ Date_t now = 0;
+ topocoord.updateConfig(cbData, config, 0, now++, OpTime(0,0));
+
+ OpTime ourOpTime(10, 10);
+ OpTime staleOpTime = (1, 1);
+
+ Status internalErrorStatus(ErrorCodes::InternalError, "didn't set status");
+
+ // Test with incorrect replset name
+ ReplicationCoordinator::ReplSetFreshArgs args;
+ args.setName = "fakeset";
+
+ BSONObjBuilder responseBuilder0;
+ Status status0 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder0, &status0);
+ ASSERT_EQUALS(ErrorCodes::ReplicaSetNotFound, status0);
+ ASSERT_TRUE(responseBuilder0.obj().isEmpty());
+
+
+ // Test with non-existent node.
+ args.setName = "rs0";
+ args.cfgver = 5; // stale config
+ args.id = 0;
+ args.who = HostAndPort("fakenode");
+ args.opTime = staleOpTime;
+
+ BSONObjBuilder responseBuilder1;
+ Status status1 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder1, &status1);
+ ASSERT_OK(status1);
+ BSONObj response1 = responseBuilder1.obj();
+ ASSERT_EQUALS("config version stale", response1["info"].String());
+ ASSERT_EQUALS(ourOpTime, OpTime(response1["opTime"].timestampValue()));
+ ASSERT_TRUE(response1["fresher"].Bool());
+ ASSERT_TRUE(response1["veto"].Bool());
+ ASSERT_EQUALS("replSet couldn't find member with id 0", response1["errmsg"].String());
+
+
+ // Test when we are primary and target node is stale.
+ args.id = 20;
+ args.cfgver = 10;
+ args.who = HostAndPort("h1");
+ args.opTime = ourOpTime;
+
+ MemberHeartbeatData h1Info(1);
+ h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), staleOpTime, "", "");
+ topocoord.updateHeartbeatData(now++, h1Info, 20, ourOpTime);
+
+ topocoord._setCurrentPrimaryForTest(0);
+
+ BSONObjBuilder responseBuilder2;
+ Status status2 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder2, &status2);
+ ASSERT_OK(status2);
+ BSONObj response2 = responseBuilder2.obj();
+ ASSERT_FALSE(response2.hasField("info"));
+ ASSERT_EQUALS(ourOpTime, OpTime(response2["opTime"].timestampValue()));
+ ASSERT_FALSE(response2["fresher"].Bool());
+ ASSERT_TRUE(response2["veto"].Bool());
+ ASSERT_EQUALS("I am already primary, h1:27017 can try again once I've stepped down",
+ response2["errmsg"].String());
+
+
+ // Test when someone else is primary and target node is stale.
+ MemberHeartbeatData h2Info(2);
+ h2Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", "");
+ topocoord.updateHeartbeatData(now++, h2Info, 30, ourOpTime);
+
+ topocoord._changeMemberState(MemberState::RS_SECONDARY);
+ topocoord._setCurrentPrimaryForTest(2);
+
+ BSONObjBuilder responseBuilder3;
+ Status status3 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder3, &status3);
+ ASSERT_OK(status3);
+ BSONObj response3 = responseBuilder3.obj();
+ ASSERT_FALSE(response3.hasField("info"));
+ ASSERT_EQUALS(ourOpTime, OpTime(response3["opTime"].timestampValue()));
+ ASSERT_FALSE(response3["fresher"].Bool());
+ ASSERT_TRUE(response3["veto"].Bool());
+ ASSERT_EQUALS(
+ "h1:27017 is trying to elect itself but h2:27017 is already primary and more "
+ "up-to-date",
+ response3["errmsg"].String());
+
+
+ // Test trying to elect a node that is caught up but isn't the highest priority node.
+ h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", "");
+ topocoord.updateHeartbeatData(now++, h1Info, 20, ourOpTime);
+ h2Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), staleOpTime, "", "");
+ topocoord.updateHeartbeatData(now++, h2Info, 30, ourOpTime);
+ MemberHeartbeatData h3Info(3);
+ h3Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", "");
+ topocoord.updateHeartbeatData(now++, h3Info, 40, ourOpTime);
+
+ BSONObjBuilder responseBuilder4;
+ Status status4 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder4, &status4);
+ ASSERT_OK(status4);
+ BSONObj response4 = responseBuilder4.obj();
+ ASSERT_FALSE(response4.hasField("info"));
+ ASSERT_EQUALS(ourOpTime, OpTime(response4["opTime"].timestampValue()));
+ ASSERT_FALSE(response4["fresher"].Bool());
+ ASSERT_TRUE(response4["veto"].Bool());
+ ASSERT_EQUALS("h1:27017 has lower priority than hself:27017", response4["errmsg"].String());
+
+
+ // Test trying to elect a node that isn't electable
+ args.id = 40;
+ args.who = HostAndPort("h3");
+
+ h3Info.setDownValues(now, "");
+ topocoord.updateHeartbeatData(now++, h3Info, 40, ourOpTime);
+
+ BSONObjBuilder responseBuilder5;
+ Status status5 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder5, &status5);
+ ASSERT_OK(status5);
+ BSONObj response5 = responseBuilder5.obj();
+ ASSERT_FALSE(response5.hasField("info"));
+ ASSERT_EQUALS(ourOpTime, OpTime(response5["opTime"].timestampValue()));
+ ASSERT_FALSE(response5["fresher"].Bool());
+ ASSERT_TRUE(response5["veto"].Bool());
+ ASSERT_EQUALS("I don't think h3:27017 is electable", response5["errmsg"].String());
+
+
+ // Finally, test trying to elect a valid node
+ args.id = 10;
+ args.who = HostAndPort("hself");
+
+ h3Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", "");
+ topocoord.updateHeartbeatData(now++, h3Info, 40, ourOpTime);
+
+ BSONObjBuilder responseBuilder6;
+ Status status6 = internalErrorStatus;
+ topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder6, &status6);
+ ASSERT_OK(status6);
+ BSONObj response6 = responseBuilder6.obj();
+ cout << response6.jsonString(TenGen, 1);
+ ASSERT_FALSE(response6.hasField("info"));
+ ASSERT_EQUALS(ourOpTime, OpTime(response6["opTime"].timestampValue()));
+ ASSERT_FALSE(response6["fresher"].Bool());
+ ASSERT_FALSE(response6["veto"].Bool());
+ ASSERT_FALSE(response6.hasField("errmsg"));
+ }
} // namespace
} // namespace repl
} // namespace mongo