summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/freshness_checker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/freshness_checker.cpp')
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp330
1 files changed, 158 insertions, 172 deletions
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index 17a501f2ce4..ca1d665dd81 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -46,192 +46,178 @@
namespace mongo {
namespace repl {
- FreshnessChecker::Algorithm::Algorithm(
- Timestamp lastOpTimeApplied,
- const ReplicaSetConfig& rsConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets) :
- _responsesProcessed(0),
- _failedVoterResponses(0),
- _lastOpTimeApplied(lastOpTimeApplied),
- _rsConfig(rsConfig),
- _selfIndex(selfIndex),
- _targets(targets),
- _votingTargets(0),
- _losableVoters(0),
- _myVote(0),
- _abortReason(None) {
-
- // Count voting targets (since the targets could be a subset of members).
- for (std::vector<HostAndPort>::const_iterator it = _targets.begin();
- it != _targets.end();
- ++it) {
- const MemberConfig* member = _rsConfig.findMemberByHostAndPort(*it);
- if (member && member->isVoter())
- ++_votingTargets;
- }
-
- _myVote = _rsConfig.getMemberAt(_selfIndex).isVoter() ? 1 : 0;
- _losableVoters = std::max(0,
- ((_votingTargets + _myVote) - _rsConfig.getMajorityVoteCount()));
-
- }
-
- FreshnessChecker::Algorithm::~Algorithm() {}
-
- std::vector<RemoteCommandRequest>
- FreshnessChecker::Algorithm::getRequests() const {
- const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
-
- // gather all not-down nodes, get their fullnames(or hostandport's)
- // schedule fresh command for each node
- BSONObjBuilder freshCmdBuilder;
- freshCmdBuilder.append("replSetFresh", 1);
- freshCmdBuilder.append("set", _rsConfig.getReplSetName());
- freshCmdBuilder.append("opTime", Date_t::fromMillisSinceEpoch(_lastOpTimeApplied.asLL()));
- freshCmdBuilder.append("who", selfConfig.getHostAndPort().toString());
- freshCmdBuilder.appendIntOrLL("cfgver", _rsConfig.getConfigVersion());
- freshCmdBuilder.append("id", selfConfig.getId());
- const BSONObj replSetFreshCmd = freshCmdBuilder.obj();
-
- std::vector<RemoteCommandRequest> requests;
- for (std::vector<HostAndPort>::const_iterator it = _targets.begin();
- it != _targets.end();
- ++it) {
- invariant(*it != selfConfig.getHostAndPort());
- requests.push_back(RemoteCommandRequest(
- *it,
- "admin",
- replSetFreshCmd,
- Milliseconds(30*1000))); // trying to match current Socket timeout
- }
-
- return requests;
+FreshnessChecker::Algorithm::Algorithm(Timestamp lastOpTimeApplied,
+ const ReplicaSetConfig& rsConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets)
+ : _responsesProcessed(0),
+ _failedVoterResponses(0),
+ _lastOpTimeApplied(lastOpTimeApplied),
+ _rsConfig(rsConfig),
+ _selfIndex(selfIndex),
+ _targets(targets),
+ _votingTargets(0),
+ _losableVoters(0),
+ _myVote(0),
+ _abortReason(None) {
+ // Count voting targets (since the targets could be a subset of members).
+ for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end();
+ ++it) {
+ const MemberConfig* member = _rsConfig.findMemberByHostAndPort(*it);
+ if (member && member->isVoter())
+ ++_votingTargets;
}
- bool FreshnessChecker::Algorithm::hadTooManyFailedVoterResponses() const {
- const bool tooManyLostVoters = (_failedVoterResponses > _losableVoters);
-
- LOG(3) << "hadTooManyFailedVoterResponses(" << tooManyLostVoters << ") = "
- << _failedVoterResponses << " failed responses <"
- << " (" << _votingTargets << " total voters - "
- << _rsConfig.getMajorityVoteCount() << " majority voters - me ("
- << _myVote << ")) -- losableVotes: " << _losableVoters;
- return tooManyLostVoters;
+ _myVote = _rsConfig.getMemberAt(_selfIndex).isVoter() ? 1 : 0;
+ _losableVoters = std::max(0, ((_votingTargets + _myVote) - _rsConfig.getMajorityVoteCount()));
+}
+
+FreshnessChecker::Algorithm::~Algorithm() {}
+
+std::vector<RemoteCommandRequest> FreshnessChecker::Algorithm::getRequests() const {
+ const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
+
+ // gather all not-down nodes, get their fullnames(or hostandport's)
+ // schedule fresh command for each node
+ BSONObjBuilder freshCmdBuilder;
+ freshCmdBuilder.append("replSetFresh", 1);
+ freshCmdBuilder.append("set", _rsConfig.getReplSetName());
+ freshCmdBuilder.append("opTime", Date_t::fromMillisSinceEpoch(_lastOpTimeApplied.asLL()));
+ freshCmdBuilder.append("who", selfConfig.getHostAndPort().toString());
+ freshCmdBuilder.appendIntOrLL("cfgver", _rsConfig.getConfigVersion());
+ freshCmdBuilder.append("id", selfConfig.getId());
+ const BSONObj replSetFreshCmd = freshCmdBuilder.obj();
+
+ std::vector<RemoteCommandRequest> requests;
+ for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end();
+ ++it) {
+ invariant(*it != selfConfig.getHostAndPort());
+ requests.push_back(RemoteCommandRequest(
+ *it,
+ "admin",
+ replSetFreshCmd,
+ Milliseconds(30 * 1000))); // trying to match current Socket timeout
}
- bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const {
- const MemberConfig* member = _rsConfig.findMemberByHostAndPort(hap);
- invariant(member);
- return member->isVoter();
- }
-
- void FreshnessChecker::Algorithm::processResponse(
- const RemoteCommandRequest& request,
- const ResponseStatus& response) {
- ++_responsesProcessed;
- bool votingMember = _isVotingMember(request.target);
-
- Status status = Status::OK();
-
- if (!response.isOK() ||
- !((status = getStatusFromCommandResult(response.getValue().data)).isOK())) {
- if (votingMember) {
- ++_failedVoterResponses;
- if (hadTooManyFailedVoterResponses()) {
- _abortReason = QuorumUnreachable;
- }
- }
- if (!response.isOK()) { // network/executor error
- LOG(2) << "FreshnessChecker: Got failed response from " << request.target;
- }
- else { // command error, like unauth
- LOG(2) << "FreshnessChecker: Got error response from " << request.target
- << " :" << status;
+ return requests;
+}
+
+bool FreshnessChecker::Algorithm::hadTooManyFailedVoterResponses() const {
+ const bool tooManyLostVoters = (_failedVoterResponses > _losableVoters);
+
+ LOG(3) << "hadTooManyFailedVoterResponses(" << tooManyLostVoters
+ << ") = " << _failedVoterResponses << " failed responses <"
+ << " (" << _votingTargets << " total voters - " << _rsConfig.getMajorityVoteCount()
+ << " majority voters - me (" << _myVote << ")) -- losableVotes: " << _losableVoters;
+ return tooManyLostVoters;
+}
+
+bool FreshnessChecker::Algorithm::_isVotingMember(const HostAndPort hap) const {
+ const MemberConfig* member = _rsConfig.findMemberByHostAndPort(hap);
+ invariant(member);
+ return member->isVoter();
+}
+
+void FreshnessChecker::Algorithm::processResponse(const RemoteCommandRequest& request,
+ const ResponseStatus& response) {
+ ++_responsesProcessed;
+ bool votingMember = _isVotingMember(request.target);
+
+ Status status = Status::OK();
+
+ if (!response.isOK() ||
+ !((status = getStatusFromCommandResult(response.getValue().data)).isOK())) {
+ if (votingMember) {
+ ++_failedVoterResponses;
+ if (hadTooManyFailedVoterResponses()) {
+ _abortReason = QuorumUnreachable;
}
- return;
}
-
- const BSONObj res = response.getValue().data;
-
- LOG(2) << "FreshnessChecker: Got response from " << request.target
- << " of " << res;
-
- if (res["fresher"].trueValue()) {
- log() << "not electing self, we are not freshest";
- _abortReason = FresherNodeFound;
- return;
- }
-
- if (res["opTime"].type() != mongo::Date) {
- error() << "wrong type for opTime argument in replSetFresh response: " <<
- typeName(res["opTime"].type());
- _abortReason = FresherNodeFound;
- return;
- }
- Timestamp remoteTime(res["opTime"].date());
- if (remoteTime == _lastOpTimeApplied) {
- _abortReason = FreshnessTie;
- }
- if (remoteTime > _lastOpTimeApplied) {
- // something really wrong (rogue command?)
- _abortReason = FresherNodeFound;
- return;
- }
-
- if (res["veto"].trueValue()) {
- BSONElement msg = res["errmsg"];
- if (msg.type() == String) {
- log() << "not electing self, " << request.target.toString() <<
- " would veto with '" << msg.String() << "'";
- }
- else {
- log() << "not electing self, " << request.target.toString() <<
- " would veto";
- }
- _abortReason = FresherNodeFound;
- return;
+ if (!response.isOK()) { // network/executor error
+ LOG(2) << "FreshnessChecker: Got failed response from " << request.target;
+ } else { // command error, like unauth
+ LOG(2) << "FreshnessChecker: Got error response from " << request.target << " :"
+ << status;
}
+ return;
}
- bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const {
- return (_abortReason != None && _abortReason != FreshnessTie) ||
- (_responsesProcessed == static_cast<int>(_targets.size()));
- }
+ const BSONObj res = response.getValue().data;
- FreshnessChecker::ElectionAbortReason FreshnessChecker::Algorithm::shouldAbortElection() const {
- return _abortReason;
- }
+ LOG(2) << "FreshnessChecker: Got response from " << request.target << " of " << res;
- FreshnessChecker::ElectionAbortReason FreshnessChecker::shouldAbortElection() const {
- return _algorithm->shouldAbortElection();
+ if (res["fresher"].trueValue()) {
+ log() << "not electing self, we are not freshest";
+ _abortReason = FresherNodeFound;
+ return;
}
- long long FreshnessChecker::getOriginalConfigVersion() const {
- return _originalConfigVersion;
+ if (res["opTime"].type() != mongo::Date) {
+ error() << "wrong type for opTime argument in replSetFresh response: "
+ << typeName(res["opTime"].type());
+ _abortReason = FresherNodeFound;
+ return;
}
-
- FreshnessChecker::FreshnessChecker() : _isCanceled(false) {}
- FreshnessChecker::~FreshnessChecker() {}
-
- StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start(
- ReplicationExecutor* executor,
- const Timestamp& lastOpTimeApplied,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& targets,
- const stdx::function<void ()>& onCompletion) {
-
- _originalConfigVersion = currentConfig.getConfigVersion();
- _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets));
- _runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor, onCompletion);
+ Timestamp remoteTime(res["opTime"].date());
+ if (remoteTime == _lastOpTimeApplied) {
+ _abortReason = FreshnessTie;
}
-
- void FreshnessChecker::cancel(ReplicationExecutor* executor) {
- _isCanceled = true;
- _runner->cancel(executor);
+ if (remoteTime > _lastOpTimeApplied) {
+ // something really wrong (rogue command?)
+ _abortReason = FresherNodeFound;
+ return;
}
-} // namespace repl
-} // namespace mongo
+ if (res["veto"].trueValue()) {
+ BSONElement msg = res["errmsg"];
+ if (msg.type() == String) {
+ log() << "not electing self, " << request.target.toString() << " would veto with '"
+ << msg.String() << "'";
+ } else {
+ log() << "not electing self, " << request.target.toString() << " would veto";
+ }
+ _abortReason = FresherNodeFound;
+ return;
+ }
+}
+
+bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const {
+ return (_abortReason != None && _abortReason != FreshnessTie) ||
+ (_responsesProcessed == static_cast<int>(_targets.size()));
+}
+
+FreshnessChecker::ElectionAbortReason FreshnessChecker::Algorithm::shouldAbortElection() const {
+ return _abortReason;
+}
+
+FreshnessChecker::ElectionAbortReason FreshnessChecker::shouldAbortElection() const {
+ return _algorithm->shouldAbortElection();
+}
+
+long long FreshnessChecker::getOriginalConfigVersion() const {
+ return _originalConfigVersion;
+}
+
+FreshnessChecker::FreshnessChecker() : _isCanceled(false) {}
+FreshnessChecker::~FreshnessChecker() {}
+
+StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start(
+ ReplicationExecutor* executor,
+ const Timestamp& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets,
+ const stdx::function<void()>& onCompletion) {
+ _originalConfigVersion = currentConfig.getConfigVersion();
+ _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets));
+ _runner.reset(new ScatterGatherRunner(_algorithm.get()));
+ return _runner->start(executor, onCompletion);
+}
+
+void FreshnessChecker::cancel(ReplicationExecutor* executor) {
+ _isCanceled = true;
+ _runner->cancel(executor);
+}
+
+} // namespace repl
+} // namespace mongo