summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/elect_cmd_runner.cpp
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2014-09-12 19:35:22 -0400
committerAndy Schwerin <schwerin@mongodb.com>2014-09-15 19:08:15 -0400
commitf8a3f964eda2be32154dd8afbfad9bdf6283e82e (patch)
tree4acee5d534bbdf6bb681781618b6fb1e8f1c7fd0 /src/mongo/db/repl/elect_cmd_runner.cpp
parentf082e861661501755b27eee6abf3a4e61e61fdc4 (diff)
downloadmongo-f8a3f964eda2be32154dd8afbfad9bdf6283e82e.tar.gz
SERVER-15248 Use a ScatterGatherAlgorithm in ElectCmdRunner.
Diffstat (limited to 'src/mongo/db/repl/elect_cmd_runner.cpp')
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp135
1 files changed, 69 insertions, 66 deletions
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index d4d1f419232..5afbeada778 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -36,111 +36,114 @@
#include "mongo/db/repl/member_heartbeat_data.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
- ElectCmdRunner::ElectCmdRunner() : _receivedVotes(0),
- _actualResponses(0) {
+ ElectCmdRunner::Algorithm::Algorithm(
+ const ReplicaSetConfig& rsConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets,
+ long long round)
+ : _actualResponses(0),
+ _sufficientResponsesReceived(false),
+ _rsConfig(rsConfig),
+ _selfIndex(selfIndex),
+ _targets(targets),
+ _round(round) {
+
+ // Vote for ourselves, first.
+ _receivedVotes = _rsConfig.getMemberAt(_selfIndex).getNumVotes();
}
- Status ElectCmdRunner::start(
- ReplicationExecutor* executor,
- const ReplicationExecutor::EventHandle& evh,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& hosts) {
-
- _sufficientResponsesReceived = evh;
-
- // We start with voting for ourselves, then request votes from other members.
- const MemberConfig& selfConfig = currentConfig.getMemberAt(selfIndex);
- _receivedVotes = selfConfig.getNumVotes();
-
- BSONObj replSetElectCmd = BSON("replSetElect" << 1 <<
- "set" << currentConfig.getReplSetName() <<
- "who" << selfConfig.getHostAndPort().toString() <<
- "whoid" << selfConfig.getId() <<
- "cfgver" << currentConfig.getConfigVersion() <<
- "round" << static_cast<long long>(executor->nextRandomInt64(
- std::numeric_limits<int64_t>::max())));
+ ElectCmdRunner::Algorithm::~Algorithm() {}
+
+ std::vector<ReplicationExecutor::RemoteCommandRequest>
+ ElectCmdRunner::Algorithm::getRequests() const {
+
+ const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
+ std::vector<ReplicationExecutor::RemoteCommandRequest> requests;
+ const BSONObj replSetElectCmd =
+ BSON("replSetElect" << 1 <<
+ "set" << _rsConfig.getReplSetName() <<
+ "who" << selfConfig.getHostAndPort().toString() <<
+ "whoid" << selfConfig.getId() <<
+ "cfgver" << _rsConfig.getConfigVersion() <<
+ "round" << _round);
// Schedule a RemoteCommandRequest for each non-DOWN node
- for (std::vector<HostAndPort>::const_iterator it = hosts.begin();
- it != hosts.end();
+ for (std::vector<HostAndPort>::const_iterator it = _targets.begin();
+ it != _targets.end();
++it) {
- const StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- executor->scheduleRemoteCommand(
- ReplicationExecutor::RemoteCommandRequest(
+
+ invariant(*it != selfConfig.getHostAndPort());
+ requests.push_back(ReplicationExecutor::RemoteCommandRequest(
*it,
"admin",
replSetElectCmd,
- Milliseconds(30*1000)), // trying to match current Socket timeout
- stdx::bind(&ElectCmdRunner::_onReplSetElectResponse,
- this,
- stdx::placeholders::_1));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- fassert(18683, cbh.getStatus());
+ Milliseconds(30*1000))); // trying to match current Socket timeout
+ }
+
+ return requests;
+ }
- _responseCallbacks.push_back(cbh.getValue());
+ bool ElectCmdRunner::Algorithm::hasReceivedSufficientResponses() const {
+ if (_sufficientResponsesReceived) {
+ return true;
}
-
- if (_responseCallbacks.size() == 0) {
- _signalSufficientResponsesReceived(executor);
+ if (_receivedVotes >= _rsConfig.getMajorityNumber()) {
+ return true;
}
-
- return Status::OK();
+ if (_actualResponses == _targets.size()) {
+ return true;
+ }
+ return false;
}
- void ElectCmdRunner::_onReplSetElectResponse(
- const ReplicationExecutor::RemoteCommandCallbackData& cbData) {
+ void ElectCmdRunner::Algorithm::processResponse(
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response) {
+
++_actualResponses;
- if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
- return;
- }
- if (cbData.response.isOK()) {
- BSONObj res = cbData.response.getValue().data;
+ if (response.isOK()) {
+ BSONObj res = response.getValue().data;
LOG(1) << "replSet elect res: " << res.toString();
BSONElement vote(res["vote"]);
if (vote.type() != mongo::NumberInt) {
error() << "wrong type for vote argument in replSetElect command: " <<
typeName(vote.type());
- _signalSufficientResponsesReceived(cbData.executor);
+ _sufficientResponsesReceived = true;
return;
}
_receivedVotes += vote._numberInt();
}
else {
- warning() << "elect command to " << cbData.request.target.toString() << " failed: " <<
- cbData.response.getStatus();
+ warning() << "elect command to " << request.target << " failed: " <<
+ response.getStatus();
}
- if (_actualResponses == _responseCallbacks.size()) {
- _signalSufficientResponsesReceived(cbData.executor);
- }
}
- void ElectCmdRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) {
- if (_sufficientResponsesReceived.isValid()) {
+ ElectCmdRunner::ElectCmdRunner() {}
+ ElectCmdRunner::~ElectCmdRunner() {}
- // Cancel any remaining command callbacks.
- std::for_each(_responseCallbacks.begin(),
- _responseCallbacks.end(),
- stdx::bind(&ReplicationExecutor::cancel,
- executor,
- stdx::placeholders::_1));
+ StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start(
+ ReplicationExecutor* executor,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets) {
- executor->signalEvent(_sufficientResponsesReceived);
- _sufficientResponsesReceived = ReplicationExecutor::EventHandle();
- }
+ const long long round(executor->nextRandomInt64(std::numeric_limits<int64_t>::max()));
+ _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, round));
+ _runner.reset(new ScatterGatherRunner(_algorithm.get()));
+ return _runner->start(executor);
}
int ElectCmdRunner::getReceivedVotes() const {
- return _receivedVotes;
+ return _algorithm->getReceivedVotes();
}
} // namespace repl