diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2014-09-12 19:35:22 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2014-09-15 19:08:15 -0400 |
commit | f8a3f964eda2be32154dd8afbfad9bdf6283e82e (patch) | |
tree | 4acee5d534bbdf6bb681781618b6fb1e8f1c7fd0 /src/mongo/db/repl/elect_cmd_runner.cpp | |
parent | f082e861661501755b27eee6abf3a4e61e61fdc4 (diff) | |
download | mongo-f8a3f964eda2be32154dd8afbfad9bdf6283e82e.tar.gz |
SERVER-15248 Use a ScatterGatherAlgorithm in ElectCmdRunner.
Diffstat (limited to 'src/mongo/db/repl/elect_cmd_runner.cpp')
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.cpp | 135 |
1 files changed, 69 insertions, 66 deletions
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index d4d1f419232..5afbeada778 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -36,111 +36,114 @@ #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/util/log.h" namespace mongo { namespace repl { - ElectCmdRunner::ElectCmdRunner() : _receivedVotes(0), - _actualResponses(0) { + ElectCmdRunner::Algorithm::Algorithm( + const ReplicaSetConfig& rsConfig, + int selfIndex, + const std::vector<HostAndPort>& targets, + long long round) + : _actualResponses(0), + _sufficientResponsesReceived(false), + _rsConfig(rsConfig), + _selfIndex(selfIndex), + _targets(targets), + _round(round) { + + // Vote for ourselves, first. + _receivedVotes = _rsConfig.getMemberAt(_selfIndex).getNumVotes(); } - Status ElectCmdRunner::start( - ReplicationExecutor* executor, - const ReplicationExecutor::EventHandle& evh, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts) { - - _sufficientResponsesReceived = evh; - - // We start with voting for ourselves, then request votes from other members. - const MemberConfig& selfConfig = currentConfig.getMemberAt(selfIndex); - _receivedVotes = selfConfig.getNumVotes(); - - BSONObj replSetElectCmd = BSON("replSetElect" << 1 << - "set" << currentConfig.getReplSetName() << - "who" << selfConfig.getHostAndPort().toString() << - "whoid" << selfConfig.getId() << - "cfgver" << currentConfig.getConfigVersion() << - "round" << static_cast<long long>(executor->nextRandomInt64( - std::numeric_limits<int64_t>::max()))); + ElectCmdRunner::Algorithm::~Algorithm() {} + + std::vector<ReplicationExecutor::RemoteCommandRequest> + ElectCmdRunner::Algorithm::getRequests() const { + + const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); + std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + const BSONObj replSetElectCmd = + BSON("replSetElect" << 1 << + "set" << _rsConfig.getReplSetName() << + "who" << selfConfig.getHostAndPort().toString() << + "whoid" << selfConfig.getId() << + "cfgver" << _rsConfig.getConfigVersion() << + "round" << _round); // Schedule a RemoteCommandRequest for each non-DOWN node - for (std::vector<HostAndPort>::const_iterator it = hosts.begin(); - it != hosts.end(); + for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); + it != _targets.end(); ++it) { - const StatusWith<ReplicationExecutor::CallbackHandle> cbh = - executor->scheduleRemoteCommand( - ReplicationExecutor::RemoteCommandRequest( + + invariant(*it != selfConfig.getHostAndPort()); + requests.push_back(ReplicationExecutor::RemoteCommandRequest( *it, "admin", replSetElectCmd, - Milliseconds(30*1000)), // trying to match current Socket timeout - stdx::bind(&ElectCmdRunner::_onReplSetElectResponse, - this, - stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18683, cbh.getStatus()); + Milliseconds(30*1000))); // trying to match current Socket timeout + } + + return requests; + } - _responseCallbacks.push_back(cbh.getValue()); + bool ElectCmdRunner::Algorithm::hasReceivedSufficientResponses() const { + if (_sufficientResponsesReceived) { + return true; } - - if (_responseCallbacks.size() == 0) { - _signalSufficientResponsesReceived(executor); + if (_receivedVotes >= _rsConfig.getMajorityNumber()) { + return true; } - - return Status::OK(); + if (_actualResponses == _targets.size()) { + return true; + } + return false; } - void ElectCmdRunner::_onReplSetElectResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData) { + void ElectCmdRunner::Algorithm::processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response) { + ++_actualResponses; - if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { - return; - } - if (cbData.response.isOK()) { - BSONObj res = cbData.response.getValue().data; + if (response.isOK()) { + BSONObj res = response.getValue().data; LOG(1) << "replSet elect res: " << res.toString(); BSONElement vote(res["vote"]); if (vote.type() != mongo::NumberInt) { error() << "wrong type for vote argument in replSetElect command: " << typeName(vote.type()); - _signalSufficientResponsesReceived(cbData.executor); + _sufficientResponsesReceived = true; return; } _receivedVotes += vote._numberInt(); } else { - warning() << "elect command to " << cbData.request.target.toString() << " failed: " << - cbData.response.getStatus(); + warning() << "elect command to " << request.target << " failed: " << + response.getStatus(); } - if (_actualResponses == _responseCallbacks.size()) { - _signalSufficientResponsesReceived(cbData.executor); - } } - void ElectCmdRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) { - if (_sufficientResponsesReceived.isValid()) { + ElectCmdRunner::ElectCmdRunner() {} + ElectCmdRunner::~ElectCmdRunner() {} - // Cancel any remaining command callbacks. - std::for_each(_responseCallbacks.begin(), - _responseCallbacks.end(), - stdx::bind(&ReplicationExecutor::cancel, - executor, - stdx::placeholders::_1)); + StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start( + ReplicationExecutor* executor, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& targets) { - executor->signalEvent(_sufficientResponsesReceived); - _sufficientResponsesReceived = ReplicationExecutor::EventHandle(); - } + const long long round(executor->nextRandomInt64(std::numeric_limits<int64_t>::max())); + _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, round)); + _runner.reset(new ScatterGatherRunner(_algorithm.get())); + return _runner->start(executor); } int ElectCmdRunner::getReceivedVotes() const { - return _receivedVotes; + return _algorithm->getReceivedVotes(); } } // namespace repl |