/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/check_quorum_for_config_change.h" #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/scatter_gather_algorithm.h" #include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace repl { using executor::RemoteCommandRequest; QuorumChecker::QuorumChecker(const ReplSetConfig* rsConfig, int myIndex) : _rsConfig(rsConfig), _myIndex(myIndex), _numResponses(1), // We "responded" to ourself already. _numElectable(0), _vetoStatus(Status::OK()), _finalStatus(ErrorCodes::CallbackCanceled, "Quorum check canceled") { invariant(myIndex < _rsConfig->getNumMembers()); const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex); if (myConfig.isVoter()) { _voters.push_back(myConfig.getHostAndPort()); } if (myConfig.isElectable()) { _numElectable = 1; } if (hasReceivedSufficientResponses()) { _onQuorumCheckComplete(); } } QuorumChecker::~QuorumChecker() {} std::vector QuorumChecker::getRequests() const { const bool isInitialConfig = _rsConfig->getConfigVersion() == 1; const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex); std::vector requests; if (hasReceivedSufficientResponses()) { return requests; } ReplSetHeartbeatArgs hbArgs; hbArgs.setSetName(_rsConfig->getReplSetName()); hbArgs.setProtocolVersion(1); hbArgs.setConfigVersion(_rsConfig->getConfigVersion()); hbArgs.setCheckEmpty(isInitialConfig); hbArgs.setSenderHost(myConfig.getHostAndPort()); hbArgs.setSenderId(myConfig.getId()); const BSONObj hbRequest = hbArgs.toBSON(); // Send a bunch of heartbeat requests. // Schedule an operation when a "sufficient" number of them have completed, and use that // to compute the quorum check results. // Wait for the "completion" callback to finish, and then it's OK to return the results. for (int i = 0; i < _rsConfig->getNumMembers(); ++i) { if (_myIndex == i) { // No need to check self for liveness or unreadiness. continue; } requests.push_back(RemoteCommandRequest(_rsConfig->getMemberAt(i).getHostAndPort(), "admin", hbRequest, BSON(rpc::kReplSetMetadataFieldName << 1), nullptr, _rsConfig->getHeartbeatTimeoutPeriodMillis())); } return requests; } void QuorumChecker::processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) { _tabulateHeartbeatResponse(request, response); if (hasReceivedSufficientResponses()) { _onQuorumCheckComplete(); } } void QuorumChecker::_onQuorumCheckComplete() { if (!_vetoStatus.isOK()) { _finalStatus = _vetoStatus; return; } if (_rsConfig->getConfigVersion() == 1 && !_badResponses.empty()) { str::stream message; message << "replSetInitiate quorum check failed because not all proposed set members " "responded affirmatively: "; for (std::vector>::const_iterator it = _badResponses.begin(); it != _badResponses.end(); ++it) { if (it != _badResponses.begin()) { message << ", "; } message << it->first.toString() << " failed with " << it->second.reason(); } _finalStatus = Status(ErrorCodes::NodeNotFound, message); return; } if (_numElectable == 0) { _finalStatus = Status(ErrorCodes::NodeNotFound, "Quorum check failed because no " "electable nodes responded; at least one required for config"); return; } if (int(_voters.size()) < _rsConfig->getMajorityVoteCount()) { str::stream message; message << "Quorum check failed because not enough voting nodes responded; required " << _rsConfig->getMajorityVoteCount() << " but "; if (_voters.size() == 0) { message << "none responded"; } else { message << "only the following " << _voters.size() << " voting nodes responded: " << _voters.front().toString(); for (size_t i = 1; i < _voters.size(); ++i) { message << ", " << _voters[i].toString(); } } if (!_badResponses.empty()) { message << "; the following nodes did not respond affirmatively: "; for (std::vector>::const_iterator it = _badResponses.begin(); it != _badResponses.end(); ++it) { if (it != _badResponses.begin()) { message << ", "; } message << it->first.toString() << " failed with " << it->second.reason(); } } _finalStatus = Status(ErrorCodes::NodeNotFound, message); return; } _finalStatus = Status::OK(); } void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& request, const ResponseStatus& response) { ++_numResponses; if (!response.isOK()) { warning() << "Failed to complete heartbeat request to " << request.target << "; " << response.status; _badResponses.push_back(std::make_pair(request.target, response.status)); return; } BSONObj resBSON = response.data; ReplSetHeartbeatResponse hbResp; Status hbStatus = hbResp.initialize(resBSON, 0); if (hbStatus.code() == ErrorCodes::InconsistentReplicaSetNames) { std::string message = str::stream() << "Our set name did not match that of " << request.target.toString(); _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); warning() << message; return; } if (!hbStatus.isOK() && hbStatus != ErrorCodes::InvalidReplicaSetConfig) { warning() << "Got error (" << hbStatus << ") response on heartbeat request to " << request.target << "; " << hbResp; _badResponses.push_back(std::make_pair(request.target, hbStatus)); return; } if (!hbResp.getReplicaSetName().empty()) { if (hbResp.getConfigVersion() >= _rsConfig->getConfigVersion()) { std::string message = str::stream() << "Our config version of " << _rsConfig->getConfigVersion() << " is no larger than the version on " << request.target.toString() << ", which is " << hbResp.getConfigVersion(); _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); warning() << message; return; } } if (_rsConfig->hasReplicaSetId()) { StatusWith replMetadata = rpc::ReplSetMetadata::readFromMetadata(response.metadata); if (replMetadata.isOK() && replMetadata.getValue().getReplicaSetId().isSet() && _rsConfig->getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) { std::string message = str::stream() << "Our replica set ID of " << _rsConfig->getReplicaSetId() << " did not match that of " << request.target.toString() << ", which is " << replMetadata.getValue().getReplicaSetId(); _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); warning() << message; } } const bool isInitialConfig = _rsConfig->getConfigVersion() == 1; if (isInitialConfig && hbResp.hasData()) { std::string message = str::stream() << "'" << request.target.toString() << "' has data already, cannot initiate set."; _vetoStatus = Status(ErrorCodes::CannotInitializeNodeWithData, message); warning() << message; return; } for (int i = 0; i < _rsConfig->getNumMembers(); ++i) { const MemberConfig& memberConfig = _rsConfig->getMemberAt(i); if (memberConfig.getHostAndPort() != request.target) { continue; } if (memberConfig.isElectable()) { ++_numElectable; } if (memberConfig.isVoter()) { _voters.push_back(request.target); } return; } invariant(false); } bool QuorumChecker::hasReceivedSufficientResponses() const { if (!_vetoStatus.isOK() || _numResponses == _rsConfig->getNumMembers()) { // Vetoed or everybody has responded. All done. return true; } if (_rsConfig->getConfigVersion() == 1) { // Have not received responses from every member, and the proposed config // version is 1 (initial configuration). Keep waiting. return false; } if (_numElectable == 0) { // Have not heard from at least one electable node. Keep waiting. return false; } if (int(_voters.size()) < _rsConfig->getMajorityVoteCount()) { // Have not heard from a majority of voters. Keep waiting. return false; } // Have heard from a majority of voters and one electable node. All done. return true; } Status checkQuorumGeneral(ReplicationExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { QuorumChecker checker(&rsConfig, myIndex); ScatterGatherRunner runner(&checker, executor); Status status = runner.run(); if (!status.isOK()) { return status; } return checker.getFinalStatus(); } Status checkQuorumForInitiate(ReplicationExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { invariant(rsConfig.getConfigVersion() == 1); return checkQuorumGeneral(executor, rsConfig, myIndex); } Status checkQuorumForReconfig(ReplicationExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { invariant(rsConfig.getConfigVersion() > 1); return checkQuorumGeneral(executor, rsConfig, myIndex); } } // namespace repl } // namespace mongo