diff options
author | Spencer T Brody <spencer@mongodb.com> | 2014-08-19 18:26:21 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2014-08-20 17:09:24 -0400 |
commit | f6490541d0bb400aec87064ead946dfc52309274 (patch) | |
tree | 28512901d724b6d7b2d8c2bd5316e66cf8837363 /src | |
parent | f9a0f6380d1858bb8c8feeb008276c680d68d7ba (diff) | |
download | mongo-f6490541d0bb400aec87064ead946dfc52309274.tar.gz |
SERVER-14454 Implement processReplSetFresh in ReplicationCoordinatorImpl
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_test.cpp | 166 |
5 files changed, 251 insertions, 76 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 0b9952d501e..5026d8769d1 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -843,8 +843,22 @@ namespace repl { Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args, BSONObjBuilder* resultObj) { - // TODO - return Status::OK(); + + Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse"); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&TopologyCoordinator::prepareFreshResponse, + _topCoord.get(), + stdx::placeholders::_1, + args, + _getLastOpApplied_inlock(), + resultObj, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + } + fassert(18652, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; } Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args, diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 616052fa817..a844fe575a2 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -48,6 +48,7 @@ namespace repl { class MemberHeartbeatData; struct MemberState; class ReplicaSetConfig; + class ReplSetHeartbeatArgs; class TagSubgroup; /** @@ -63,10 +64,6 @@ namespace repl { virtual ~TopologyCoordinator() {} - // The optime of the last op marked as committed by the leader - virtual void setCommitOkayThrough(const OpTime& optime) = 0; - // The optime of the last op received over the network from the sync source - virtual void setLastReceived(const OpTime& optime) = 0; // The index into the config used when we next choose a sync source virtual void setForceSyncSourceIndex(int index) = 0; @@ -98,12 +95,12 @@ namespace repl { BSONObjBuilder* response, Status* result) = 0; - // produce a reply to a RAFT-style RequestVote RPC - virtual void prepareRequestVoteResponse(const Date_t now, - const BSONObj& cmdObj, - const OpTime& lastOpApplied, - std::string& errmsg, - BSONObjBuilder& result) = 0; + // produce a reply to a replSetFresh command + virtual void prepareFreshResponse(const ReplicationExecutor::CallbackData& data, + const ReplicationCoordinator::ReplSetFreshArgs& args, + const OpTime& lastOpApplied, + BSONObjBuilder* response, + Status* result) = 0; // produce a reply to a received electCmd virtual void prepareElectCmdResponse(const Date_t now, diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 6e7afdcd7ce..1503ffa80b2 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -70,14 +70,6 @@ namespace repl { { } - void TopologyCoordinatorImpl::setCommitOkayThrough(const OpTime& optime) { - _commitOkayThrough = optime; - } - - void TopologyCoordinatorImpl::setLastReceived(const OpTime& optime) { - _lastReceived = optime; - } - void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) { invariant(_forceSyncSourceIndex < _currentConfig.getNumMembers()); _forceSyncSourceIndex = index; @@ -329,72 +321,72 @@ namespace repl { *result = Status::OK(); } - // Produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command - // The caller should validate that the message is for the correct set, and has the required data - void TopologyCoordinatorImpl::prepareRequestVoteResponse(const Date_t now, - const BSONObj& cmdObj, - const OpTime& lastOpApplied, - std::string& errmsg, - BSONObjBuilder& result) { + void TopologyCoordinatorImpl::prepareFreshResponse( + const ReplicationExecutor::CallbackData& data, + const ReplicationCoordinator::ReplSetFreshArgs& args, + const OpTime& lastOpApplied, + BSONObjBuilder* response, + Status* result) { - string who = cmdObj["who"].String(); - int cfgver = cmdObj["cfgver"].Int(); - OpTime opTime(cmdObj["opTime"].Date()); + if (args.setName != _currentConfig.getReplSetName()) { + *result = Status(ErrorCodes::ReplicaSetNotFound, + str::stream() << "Wrong repl set name. Expected: " << + _currentConfig.getReplSetName() << + ", received: " << args.setName); + return; + } bool weAreFresher = false; - if( _currentConfig.getConfigVersion() > cfgver ) { - log() << "replSet member " << who << " is not yet aware its cfg version " - << cfgver << " is stale"; - result.append("info", "config version stale"); + if( _currentConfig.getConfigVersion() > args.cfgver ) { + log() << "replSet member " << args.who << " is not yet aware its cfg version " + << args.cfgver << " is stale"; + response->append("info", "config version stale"); weAreFresher = true; } // check not only our own optime, but any other member we can reach - else if( opTime < _commitOkayThrough || - opTime < _latestKnownOpTime()) { + else if( args.opTime < lastOpApplied || + args.opTime < _latestKnownOpTime()) { weAreFresher = true; } - result.appendDate("opTime", lastOpApplied.asDate()); - result.append("fresher", weAreFresher); + response->appendDate("opTime", lastOpApplied.asDate()); + response->append("fresher", weAreFresher); - bool doVeto = _shouldVeto(cmdObj, errmsg); - result.append("veto",doVeto); + std::string errmsg; + bool doVeto = _shouldVetoMember(args.id, lastOpApplied, &errmsg); + response->append("veto", doVeto); if (doVeto) { - result.append("errmsg", errmsg); + response->append("errmsg", errmsg); } + *result = Status::OK(); } - bool TopologyCoordinatorImpl::_shouldVeto(const BSONObj& cmdObj, string& errmsg) const { - // don't veto older versions - if (cmdObj["id"].eoo()) { - // they won't be looking for the veto field - return false; - } - - const int id = cmdObj["id"].Int(); - const int hopefulIndex = _getMemberIndex(id); + bool TopologyCoordinatorImpl::_shouldVetoMember(unsigned int memberID, + const OpTime& lastOpApplied, + std::string* errmsg) const { + const int hopefulIndex = _getMemberIndex(memberID); const int highestPriorityIndex = _getHighestPriorityElectableIndex(); if (hopefulIndex == -1) { - errmsg = str::stream() << "replSet couldn't find member with id " << id; + *errmsg = str::stream() << "replSet couldn't find member with id " << memberID; return true; } - if ((_currentPrimaryIndex != -1) && - (_commitOkayThrough >= _hbdata[hopefulIndex].getOpTime())) { - // hbinfo is not updated, so we have to check the primary's last optime separately - errmsg = str::stream() << "I am already primary, " << + if ((_currentPrimaryIndex == _selfIndex) && + (lastOpApplied >= _hbdata[hopefulIndex].getOpTime())) { + // hbinfo is not updated for ourself, so if we are primary we have to check the + // primary's last optime separately + *errmsg = str::stream() << "I am already primary, " << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " can try again once I've stepped down"; return true; } if (_currentPrimaryIndex != -1 && - (_currentConfig.getMemberAt(hopefulIndex).getId() != - _currentConfig.getMemberAt(_currentPrimaryIndex).getId()) && - (_hbdata[_currentPrimaryIndex].getOpTime() >= - _hbdata[hopefulIndex].getOpTime())) { + (hopefulIndex != _currentPrimaryIndex) && + (_hbdata[_currentPrimaryIndex].getOpTime() >= + _hbdata[hopefulIndex].getOpTime())) { // other members might be aware of more up-to-date nodes - errmsg = str::stream() << + *errmsg = str::stream() << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is trying to elect itself but " << _currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() << @@ -405,15 +397,15 @@ namespace repl { if ((highestPriorityIndex != -1) && _currentConfig.getMemberAt(highestPriorityIndex).getPriority() > _currentConfig.getMemberAt(hopefulIndex).getPriority()) { - errmsg = str::stream() << + *errmsg = str::stream() << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " has lower priority than " << _currentConfig.getMemberAt(highestPriorityIndex).getHostAndPort().toString(); return true; } - if (!_electableSet.count(id)) { - errmsg = str::stream() << "I don't think " + if (!_electableSet.count(memberID)) { + *errmsg = str::stream() << "I don't think " << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is electable"; return true; @@ -969,6 +961,10 @@ namespace repl { } } + void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { + _currentPrimaryIndex = primaryIndex; + } + void TopologyCoordinatorImpl::prepareStatusResponse( const ReplicationExecutor::CallbackData& data, Date_t now, diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 0b9f130f8db..0a08e08ca69 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -72,8 +72,6 @@ namespace repl { public: explicit TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs); - virtual void setCommitOkayThrough(const OpTime& optime); - virtual void setLastReceived(const OpTime& optime); // TODO(spencer): Can this be made private? virtual void setForceSyncSourceIndex(int index); @@ -102,12 +100,11 @@ namespace repl { BSONObjBuilder* response, Status* result); - // produces a reply to a RAFT-style RequestVote RPC - virtual void prepareRequestVoteResponse(const Date_t now, - const BSONObj& cmdObj, - const OpTime& lastOpApplied, - std::string& errmsg, - BSONObjBuilder& result); + virtual void prepareFreshResponse(const ReplicationExecutor::CallbackData& data, + const ReplicationCoordinator::ReplSetFreshArgs& args, + const OpTime& lastOpApplied, + BSONObjBuilder* response, + Status* result); // produces a reply to a received electCmd virtual void prepareElectCmdResponse(const Date_t now, @@ -164,6 +161,11 @@ namespace repl { // call this method from outside of TopologyCoordinatorImpl or a unit test. void _changeMemberState(const MemberState& newMemberState); + // Sets _currentPrimaryIndex to the given index. Should only be used in unit tests! + // TODO(spencer): Remove this once we can easily call for an election in unit tests to + // set the current primary. + void _setCurrentPrimaryForTest(int primaryIndex); + private: // Returns the number of heartbeat pings which have occurred. @@ -172,9 +174,12 @@ namespace repl { // Returns the current "ping" value for the given member by their address virtual int _getPing(const HostAndPort& host); - // Determines if we will veto the member in the "fresh" command response + // Determines if we will veto the member specified by "memberID", given that the last op + // we have applied locally is "lastOpApplied". // If we veto, the errmsg will be filled in with a reason - bool _shouldVeto(const BSONObj& cmdObj, string& errmsg) const; + bool _shouldVetoMember(unsigned int memberID, + const OpTime& lastOpApplied, + std::string* errmsg) const; // Returns the index of the member with the matching id, or -1 if none match. int _getMemberIndex(int id) const; @@ -197,9 +202,6 @@ namespace repl { // Scans the electable set and returns the highest priority member index int _getHighestPriorityElectableIndex() const; - OpTime _commitOkayThrough; // the primary's latest op that won't get rolled back - OpTime _lastReceived; // the last op we have received from our sync source - // Our current state (PRIMARY, SECONDARY, etc) MemberState _memberState; diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index b20bd59b595..131807f9986 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -583,6 +583,172 @@ namespace { // TODO(spencer): Test electionTime and pingMs are set properly } + TEST(TopologyCoordinator, PrepareRequestVoteResponse) { + ReplicationExecutor::CallbackHandle cbh; + ReplicationExecutor::CallbackData cbData(NULL, + cbh, + Status::OK()); + ReplicaSetConfig config; + + ASSERT_OK(config.initialize(BSON("_id" << "rs0" << + "version" << 10 << + "members" << BSON_ARRAY( + BSON("_id" << 10 << + "host" << "hself" << + "priority" << 10) << + BSON("_id" << 20 << "host" << "h1") << + BSON("_id" << 30 << "host" << "h2") << + BSON("_id" << 40 << + "host" << "h3" << + "priority" << 10))))); + ASSERT_OK(config.validate()); + + TopologyCoordinatorImpl topocoord((Seconds(999))); + Date_t now = 0; + topocoord.updateConfig(cbData, config, 0, now++, OpTime(0,0)); + + OpTime ourOpTime(10, 10); + OpTime staleOpTime = (1, 1); + + Status internalErrorStatus(ErrorCodes::InternalError, "didn't set status"); + + // Test with incorrect replset name + ReplicationCoordinator::ReplSetFreshArgs args; + args.setName = "fakeset"; + + BSONObjBuilder responseBuilder0; + Status status0 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder0, &status0); + ASSERT_EQUALS(ErrorCodes::ReplicaSetNotFound, status0); + ASSERT_TRUE(responseBuilder0.obj().isEmpty()); + + + // Test with non-existent node. + args.setName = "rs0"; + args.cfgver = 5; // stale config + args.id = 0; + args.who = HostAndPort("fakenode"); + args.opTime = staleOpTime; + + BSONObjBuilder responseBuilder1; + Status status1 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder1, &status1); + ASSERT_OK(status1); + BSONObj response1 = responseBuilder1.obj(); + ASSERT_EQUALS("config version stale", response1["info"].String()); + ASSERT_EQUALS(ourOpTime, OpTime(response1["opTime"].timestampValue())); + ASSERT_TRUE(response1["fresher"].Bool()); + ASSERT_TRUE(response1["veto"].Bool()); + ASSERT_EQUALS("replSet couldn't find member with id 0", response1["errmsg"].String()); + + + // Test when we are primary and target node is stale. + args.id = 20; + args.cfgver = 10; + args.who = HostAndPort("h1"); + args.opTime = ourOpTime; + + MemberHeartbeatData h1Info(1); + h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), staleOpTime, "", ""); + topocoord.updateHeartbeatData(now++, h1Info, 20, ourOpTime); + + topocoord._setCurrentPrimaryForTest(0); + + BSONObjBuilder responseBuilder2; + Status status2 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder2, &status2); + ASSERT_OK(status2); + BSONObj response2 = responseBuilder2.obj(); + ASSERT_FALSE(response2.hasField("info")); + ASSERT_EQUALS(ourOpTime, OpTime(response2["opTime"].timestampValue())); + ASSERT_FALSE(response2["fresher"].Bool()); + ASSERT_TRUE(response2["veto"].Bool()); + ASSERT_EQUALS("I am already primary, h1:27017 can try again once I've stepped down", + response2["errmsg"].String()); + + + // Test when someone else is primary and target node is stale. + MemberHeartbeatData h2Info(2); + h2Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", ""); + topocoord.updateHeartbeatData(now++, h2Info, 30, ourOpTime); + + topocoord._changeMemberState(MemberState::RS_SECONDARY); + topocoord._setCurrentPrimaryForTest(2); + + BSONObjBuilder responseBuilder3; + Status status3 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder3, &status3); + ASSERT_OK(status3); + BSONObj response3 = responseBuilder3.obj(); + ASSERT_FALSE(response3.hasField("info")); + ASSERT_EQUALS(ourOpTime, OpTime(response3["opTime"].timestampValue())); + ASSERT_FALSE(response3["fresher"].Bool()); + ASSERT_TRUE(response3["veto"].Bool()); + ASSERT_EQUALS( + "h1:27017 is trying to elect itself but h2:27017 is already primary and more " + "up-to-date", + response3["errmsg"].String()); + + + // Test trying to elect a node that is caught up but isn't the highest priority node. + h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", ""); + topocoord.updateHeartbeatData(now++, h1Info, 20, ourOpTime); + h2Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), staleOpTime, "", ""); + topocoord.updateHeartbeatData(now++, h2Info, 30, ourOpTime); + MemberHeartbeatData h3Info(3); + h3Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", ""); + topocoord.updateHeartbeatData(now++, h3Info, 40, ourOpTime); + + BSONObjBuilder responseBuilder4; + Status status4 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder4, &status4); + ASSERT_OK(status4); + BSONObj response4 = responseBuilder4.obj(); + ASSERT_FALSE(response4.hasField("info")); + ASSERT_EQUALS(ourOpTime, OpTime(response4["opTime"].timestampValue())); + ASSERT_FALSE(response4["fresher"].Bool()); + ASSERT_TRUE(response4["veto"].Bool()); + ASSERT_EQUALS("h1:27017 has lower priority than hself:27017", response4["errmsg"].String()); + + + // Test trying to elect a node that isn't electable + args.id = 40; + args.who = HostAndPort("h3"); + + h3Info.setDownValues(now, ""); + topocoord.updateHeartbeatData(now++, h3Info, 40, ourOpTime); + + BSONObjBuilder responseBuilder5; + Status status5 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder5, &status5); + ASSERT_OK(status5); + BSONObj response5 = responseBuilder5.obj(); + ASSERT_FALSE(response5.hasField("info")); + ASSERT_EQUALS(ourOpTime, OpTime(response5["opTime"].timestampValue())); + ASSERT_FALSE(response5["fresher"].Bool()); + ASSERT_TRUE(response5["veto"].Bool()); + ASSERT_EQUALS("I don't think h3:27017 is electable", response5["errmsg"].String()); + + + // Finally, test trying to elect a valid node + args.id = 10; + args.who = HostAndPort("hself"); + + h3Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0, 0), ourOpTime, "", ""); + topocoord.updateHeartbeatData(now++, h3Info, 40, ourOpTime); + + BSONObjBuilder responseBuilder6; + Status status6 = internalErrorStatus; + topocoord.prepareFreshResponse(cbData, args, ourOpTime, &responseBuilder6, &status6); + ASSERT_OK(status6); + BSONObj response6 = responseBuilder6.obj(); + cout << response6.jsonString(TenGen, 1); + ASSERT_FALSE(response6.hasField("info")); + ASSERT_EQUALS(ourOpTime, OpTime(response6["opTime"].timestampValue())); + ASSERT_FALSE(response6["fresher"].Bool()); + ASSERT_FALSE(response6["veto"].Bool()); + ASSERT_FALSE(response6.hasField("errmsg")); + } } // namespace } // namespace repl } // namespace mongo |