diff options
Diffstat (limited to 'src/mongo/db/repl/freshness_checker.cpp')
-rw-r--r-- | src/mongo/db/repl/freshness_checker.cpp | 330 |
1 files changed, 158 insertions, 172 deletions
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 17a501f2ce4..ca1d665dd81 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -46,192 +46,178 @@ namespace mongo { namespace repl { - FreshnessChecker::Algorithm::Algorithm( - Timestamp lastOpTimeApplied, - const ReplicaSetConfig& rsConfig, - int selfIndex, - const std::vector<HostAndPort>& targets) : - _responsesProcessed(0), - _failedVoterResponses(0), - _lastOpTimeApplied(lastOpTimeApplied), - _rsConfig(rsConfig), - _selfIndex(selfIndex), - _targets(targets), - _votingTargets(0), - _losableVoters(0), - _myVote(0), - _abortReason(None) { - - // Count voting targets (since the targets could be a subset of members). - for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); - it != _targets.end(); - ++it) { - const MemberConfig* member = _rsConfig.findMemberByHostAndPort(*it); - if (member && member->isVoter()) - ++_votingTargets; - } - - _myVote = _rsConfig.getMemberAt(_selfIndex).isVoter() ? 1 : 0; - _losableVoters = std::max(0, - ((_votingTargets + _myVote) - _rsConfig.getMajorityVoteCount())); - - } - - FreshnessChecker::Algorithm::~Algorithm() {} - - std::vector<RemoteCommandRequest> - FreshnessChecker::Algorithm::getRequests() const { - const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); - - // gather all not-down nodes, get their fullnames(or hostandport's) - // schedule fresh command for each node - BSONObjBuilder freshCmdBuilder; - freshCmdBuilder.append("replSetFresh", 1); - freshCmdBuilder.append("set", _rsConfig.getReplSetName()); - freshCmdBuilder.append("opTime", Date_t::fromMillisSinceEpoch(_lastOpTimeApplied.asLL())); - freshCmdBuilder.append("who", selfConfig.getHostAndPort().toString()); - freshCmdBuilder.appendIntOrLL("cfgver", _rsConfig.getConfigVersion()); - freshCmdBuilder.append("id", selfConfig.getId()); - const BSONObj replSetFreshCmd = freshCmdBuilder.obj(); - - std::vector<RemoteCommandRequest> requests; - for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); - it != _targets.end(); - ++it) { - invariant(*it != selfConfig.getHostAndPort()); - requests.push_back(RemoteCommandRequest( - *it, - "admin", - replSetFreshCmd, - Milliseconds(30*1000))); // trying to match current Socket timeout - } - - return requests; +FreshnessChecker::Algorithm::Algorithm(Timestamp lastOpTimeApplied, + const ReplicaSetConfig& rsConfig, + int selfIndex, + const std::vector<HostAndPort>& targets) + : _responsesProcessed(0), + _failedVoterResponses(0), + _lastOpTimeApplied(lastOpTimeApplied), + _rsConfig(rsConfig), + _selfIndex(selfIndex), + _targets(targets), + _votingTargets(0), + _losableVoters(0), + _myVote(0), + _abortReason(None) { + // Count voting targets (since the targets could be a subset of members). + for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end(); + ++it) { + const MemberConfig* member = _rsConfig.findMemberByHostAndPort(*it); + if (member && member->isVoter()) + ++_votingTargets; } - bool FreshnessChecker::Algorithm::hadTooManyFailedVoterResponses() const { - const bool tooManyLostVoters = (_failedVoterResponses > _losableVoters); - - LOG(3) << "hadTooManyFailedVoterResponses(" << tooManyLostVoters << ") = " - << _failedVoterResponses << " failed responses <" - << " (" << _votingTargets << " total voters - " - << _rsConfig.getMajorityVoteCount() << " majority voters - me (" - << _myVote << ")) -- losableVotes: " << _losableVoters; - return tooManyLostVoters; + _myVote = _rsConfig.getMemberAt(_selfIndex).isVoter() ? 1 : 0; + _losableVoters = std::max(0, ((_votingTargets + _myVote) - _rsConfig.getMajorityVoteCount())); +} + +FreshnessChecker::Algorithm::~Algorithm() {} + +std::vector<RemoteCommandRequest> FreshnessChecker::Algorithm::getRequests() const { + const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); + + // gather all not-down nodes, get their fullnames(or hostandport's) + // schedule fresh command for each node + BSONObjBuilder freshCmdBuilder; + freshCmdBuilder.append("replSetFresh", 1); + freshCmdBuilder.append("set", _rsConfig.getReplSetName()); + freshCmdBuilder.append("opTime", Date_t::fromMillisSinceEpoch(_lastOpTimeApplied.asLL())); + freshCmdBuilder.append("who", selfConfig.getHostAndPort().toString()); + freshCmdBuilder.appendIntOrLL("cfgver", _rsConfig.getConfigVersion()); + freshCmdBuilder.append("id", selfConfig.getId()); + const BSONObj replSetFreshCmd = freshCmdBuilder.obj(); + + std::vector<RemoteCommandRequest> requests; + for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end(); + ++it) { + invariant(*it != selfConfig.getHostAndPort()); + requests.push_back(RemoteCommandRequest( + *it, + "admin", + replSetFreshCmd, + Milliseconds(30 * 1000))); // trying to match current Socket timeout } - bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const { - const MemberConfig* member = _rsConfig.findMemberByHostAndPort(hap); - invariant(member); - return member->isVoter(); - } - - void FreshnessChecker::Algorithm::processResponse( - const RemoteCommandRequest& request, - const ResponseStatus& response) { - ++_responsesProcessed; - bool votingMember = _isVotingMember(request.target); - - Status status = Status::OK(); - - if (!response.isOK() || - !((status = getStatusFromCommandResult(response.getValue().data)).isOK())) { - if (votingMember) { - ++_failedVoterResponses; - if (hadTooManyFailedVoterResponses()) { - _abortReason = QuorumUnreachable; - } - } - if (!response.isOK()) { // network/executor error - LOG(2) << "FreshnessChecker: Got failed response from " << request.target; - } - else { // command error, like unauth - LOG(2) << "FreshnessChecker: Got error response from " << request.target - << " :" << status; + return requests; +} + +bool FreshnessChecker::Algorithm::hadTooManyFailedVoterResponses() const { + const bool tooManyLostVoters = (_failedVoterResponses > _losableVoters); + + LOG(3) << "hadTooManyFailedVoterResponses(" << tooManyLostVoters + << ") = " << _failedVoterResponses << " failed responses <" + << " (" << _votingTargets << " total voters - " << _rsConfig.getMajorityVoteCount() + << " majority voters - me (" << _myVote << ")) -- losableVotes: " << _losableVoters; + return tooManyLostVoters; +} + +bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const { + const MemberConfig* member = _rsConfig.findMemberByHostAndPort(hap); + invariant(member); + return member->isVoter(); +} + +void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& request, + const ResponseStatus& response) { + ++_responsesProcessed; + bool votingMember = _isVotingMember(request.target); + + Status status = Status::OK(); + + if (!response.isOK() || + !((status = getStatusFromCommandResult(response.getValue().data)).isOK())) { + if (votingMember) { + ++_failedVoterResponses; + if (hadTooManyFailedVoterResponses()) { + _abortReason = QuorumUnreachable; } - return; } - - const BSONObj res = response.getValue().data; - - LOG(2) << "FreshnessChecker: Got response from " << request.target - << " of " << res; - - if (res["fresher"].trueValue()) { - log() << "not electing self, we are not freshest"; - _abortReason = FresherNodeFound; - return; - } - - if (res["opTime"].type() != mongo::Date) { - error() << "wrong type for opTime argument in replSetFresh response: " << - typeName(res["opTime"].type()); - _abortReason = FresherNodeFound; - return; - } - Timestamp remoteTime(res["opTime"].date()); - if (remoteTime == _lastOpTimeApplied) { - _abortReason = FreshnessTie; - } - if (remoteTime > _lastOpTimeApplied) { - // something really wrong (rogue command?) - _abortReason = FresherNodeFound; - return; - } - - if (res["veto"].trueValue()) { - BSONElement msg = res["errmsg"]; - if (msg.type() == String) { - log() << "not electing self, " << request.target.toString() << - " would veto with '" << msg.String() << "'"; - } - else { - log() << "not electing self, " << request.target.toString() << - " would veto"; - } - _abortReason = FresherNodeFound; - return; + if (!response.isOK()) { // network/executor error + LOG(2) << "FreshnessChecker: Got failed response from " << request.target; + } else { // command error, like unauth + LOG(2) << "FreshnessChecker: Got error response from " << request.target << " :" + << status; } + return; } - bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const { - return (_abortReason != None && _abortReason != FreshnessTie) || - (_responsesProcessed == static_cast<int>(_targets.size())); - } + const BSONObj res = response.getValue().data; - FreshnessChecker::ElectionAbortReason FreshnessChecker::Algorithm::shouldAbortElection() const { - return _abortReason; - } + LOG(2) << "FreshnessChecker: Got response from " << request.target << " of " << res; - FreshnessChecker::ElectionAbortReason FreshnessChecker::shouldAbortElection() const { - return _algorithm->shouldAbortElection(); + if (res["fresher"].trueValue()) { + log() << "not electing self, we are not freshest"; + _abortReason = FresherNodeFound; + return; } - long long FreshnessChecker::getOriginalConfigVersion() const { - return _originalConfigVersion; + if (res["opTime"].type() != mongo::Date) { + error() << "wrong type for opTime argument in replSetFresh response: " + << typeName(res["opTime"].type()); + _abortReason = FresherNodeFound; + return; } - - FreshnessChecker::FreshnessChecker() : _isCanceled(false) {} - FreshnessChecker::~FreshnessChecker() {} - - StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start( - ReplicationExecutor* executor, - const Timestamp& lastOpTimeApplied, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& targets, - const stdx::function<void ()>& onCompletion) { - - _originalConfigVersion = currentConfig.getConfigVersion(); - _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets)); - _runner.reset(new ScatterGatherRunner(_algorithm.get())); - return _runner->start(executor, onCompletion); + Timestamp remoteTime(res["opTime"].date()); + if (remoteTime == _lastOpTimeApplied) { + _abortReason = FreshnessTie; } - - void FreshnessChecker::cancel(ReplicationExecutor* executor) { - _isCanceled = true; - _runner->cancel(executor); + if (remoteTime > _lastOpTimeApplied) { + // something really wrong (rogue command?) + _abortReason = FresherNodeFound; + return; } -} // namespace repl -} // namespace mongo + if (res["veto"].trueValue()) { + BSONElement msg = res["errmsg"]; + if (msg.type() == String) { + log() << "not electing self, " << request.target.toString() << " would veto with '" + << msg.String() << "'"; + } else { + log() << "not electing self, " << request.target.toString() << " would veto"; + } + _abortReason = FresherNodeFound; + return; + } +} + +bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const { + return (_abortReason != None && _abortReason != FreshnessTie) || + (_responsesProcessed == static_cast<int>(_targets.size())); +} + +FreshnessChecker::ElectionAbortReason FreshnessChecker::Algorithm::shouldAbortElection() const { + return _abortReason; +} + +FreshnessChecker::ElectionAbortReason FreshnessChecker::shouldAbortElection() const { + return _algorithm->shouldAbortElection(); +} + +long long FreshnessChecker::getOriginalConfigVersion() const { + return _originalConfigVersion; +} + +FreshnessChecker::FreshnessChecker() : _isCanceled(false) {} +FreshnessChecker::~FreshnessChecker() {} + +StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start( + ReplicationExecutor* executor, + const Timestamp& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& targets, + const stdx::function<void()>& onCompletion) { + _originalConfigVersion = currentConfig.getConfigVersion(); + _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets)); + _runner.reset(new ScatterGatherRunner(_algorithm.get())); + return _runner->start(executor, onCompletion); +} + +void FreshnessChecker::cancel(ReplicationExecutor* executor) { + _isCanceled = true; + _runner->cancel(executor); +} + +} // namespace repl +} // namespace mongo |