diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2014-09-12 19:35:22 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2014-09-15 19:08:15 -0400 |
commit | f8a3f964eda2be32154dd8afbfad9bdf6283e82e (patch) | |
tree | 4acee5d534bbdf6bb681781618b6fb1e8f1c7fd0 | |
parent | f082e861661501755b27eee6abf3a4e61e61fdc4 (diff) | |
download | mongo-f8a3f964eda2be32154dd8afbfad9bdf6283e82e.tar.gz |
SERVER-15248 Use a ScatterGatherAlgorithm in ElectCmdRunner.
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.cpp | 135 | ||||
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.h | 77 | ||||
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner_test.cpp | 127 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_elect.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp | 11 |
5 files changed, 178 insertions, 204 deletions
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index d4d1f419232..5afbeada778 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -36,111 +36,114 @@ #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/util/log.h" namespace mongo { namespace repl { - ElectCmdRunner::ElectCmdRunner() : _receivedVotes(0), - _actualResponses(0) { + ElectCmdRunner::Algorithm::Algorithm( + const ReplicaSetConfig& rsConfig, + int selfIndex, + const std::vector<HostAndPort>& targets, + long long round) + : _actualResponses(0), + _sufficientResponsesReceived(false), + _rsConfig(rsConfig), + _selfIndex(selfIndex), + _targets(targets), + _round(round) { + + // Vote for ourselves, first. + _receivedVotes = _rsConfig.getMemberAt(_selfIndex).getNumVotes(); } - Status ElectCmdRunner::start( - ReplicationExecutor* executor, - const ReplicationExecutor::EventHandle& evh, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts) { - - _sufficientResponsesReceived = evh; - - // We start with voting for ourselves, then request votes from other members. - const MemberConfig& selfConfig = currentConfig.getMemberAt(selfIndex); - _receivedVotes = selfConfig.getNumVotes(); - - BSONObj replSetElectCmd = BSON("replSetElect" << 1 << - "set" << currentConfig.getReplSetName() << - "who" << selfConfig.getHostAndPort().toString() << - "whoid" << selfConfig.getId() << - "cfgver" << currentConfig.getConfigVersion() << - "round" << static_cast<long long>(executor->nextRandomInt64( - std::numeric_limits<int64_t>::max()))); + ElectCmdRunner::Algorithm::~Algorithm() {} + + std::vector<ReplicationExecutor::RemoteCommandRequest> + ElectCmdRunner::Algorithm::getRequests() const { + + const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); + std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + const BSONObj replSetElectCmd = + BSON("replSetElect" << 1 << + "set" << _rsConfig.getReplSetName() << + "who" << selfConfig.getHostAndPort().toString() << + "whoid" << selfConfig.getId() << + "cfgver" << _rsConfig.getConfigVersion() << + "round" << _round); // Schedule a RemoteCommandRequest for each non-DOWN node - for (std::vector<HostAndPort>::const_iterator it = hosts.begin(); - it != hosts.end(); + for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); + it != _targets.end(); ++it) { - const StatusWith<ReplicationExecutor::CallbackHandle> cbh = - executor->scheduleRemoteCommand( - ReplicationExecutor::RemoteCommandRequest( + + invariant(*it != selfConfig.getHostAndPort()); + requests.push_back(ReplicationExecutor::RemoteCommandRequest( *it, "admin", replSetElectCmd, - Milliseconds(30*1000)), // trying to match current Socket timeout - stdx::bind(&ElectCmdRunner::_onReplSetElectResponse, - this, - stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18683, cbh.getStatus()); + Milliseconds(30*1000))); // trying to match current Socket timeout + } + + return requests; + } - _responseCallbacks.push_back(cbh.getValue()); + bool ElectCmdRunner::Algorithm::hasReceivedSufficientResponses() const { + if (_sufficientResponsesReceived) { + return true; } - - if (_responseCallbacks.size() == 0) { - _signalSufficientResponsesReceived(executor); + if (_receivedVotes >= _rsConfig.getMajorityNumber()) { + return true; } - - return Status::OK(); + if (_actualResponses == _targets.size()) { + return true; + } + return false; } - void ElectCmdRunner::_onReplSetElectResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData) { + void ElectCmdRunner::Algorithm::processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response) { + ++_actualResponses; - if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { - return; - } - if (cbData.response.isOK()) { - BSONObj res = cbData.response.getValue().data; + if (response.isOK()) { + BSONObj res = response.getValue().data; LOG(1) << "replSet elect res: " << res.toString(); BSONElement vote(res["vote"]); if (vote.type() != mongo::NumberInt) { error() << "wrong type for vote argument in replSetElect command: " << typeName(vote.type()); - _signalSufficientResponsesReceived(cbData.executor); + _sufficientResponsesReceived = true; return; } _receivedVotes += vote._numberInt(); } else { - warning() << "elect command to " << cbData.request.target.toString() << " failed: " << - cbData.response.getStatus(); + warning() << "elect command to " << request.target << " failed: " << + response.getStatus(); } - if (_actualResponses == _responseCallbacks.size()) { - _signalSufficientResponsesReceived(cbData.executor); - } } - void ElectCmdRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) { - if (_sufficientResponsesReceived.isValid()) { + ElectCmdRunner::ElectCmdRunner() {} + ElectCmdRunner::~ElectCmdRunner() {} - // Cancel any remaining command callbacks. - std::for_each(_responseCallbacks.begin(), - _responseCallbacks.end(), - stdx::bind(&ReplicationExecutor::cancel, - executor, - stdx::placeholders::_1)); + StatusWith<ReplicationExecutor::EventHandle> ElectCmdRunner::start( + ReplicationExecutor* executor, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& targets) { - executor->signalEvent(_sufficientResponsesReceived); - _sufficientResponsesReceived = ReplicationExecutor::EventHandle(); - } + const long long round(executor->nextRandomInt64(std::numeric_limits<int64_t>::max())); + _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, round)); + _runner.reset(new ScatterGatherRunner(_algorithm.get())); + return _runner->start(executor); } int ElectCmdRunner::getReceivedVotes() const { - return _receivedVotes; + return _algorithm->getReceivedVotes(); } } // namespace repl diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index 4879da252dd..67518f70fee 100644 --- a/src/mongo/db/repl/elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -28,10 +28,13 @@ #pragma once +#include <boost/scoped_ptr.hpp> #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_algorithm.h" namespace mongo { @@ -40,57 +43,67 @@ namespace mongo { namespace repl { class ReplicaSetConfig; - class MemberHeartbeatData; + class ScatterGatherRunner; class ElectCmdRunner { MONGO_DISALLOW_COPYING(ElectCmdRunner); public: + class Algorithm : public ScatterGatherAlgorithm { + public: + Algorithm(const ReplicaSetConfig& rsConfig, + int selfIndex, + const std::vector<HostAndPort>& targets, + long long round); + + virtual ~Algorithm(); + virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual void processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response); + virtual bool hasReceivedSufficientResponses() const; + + int getReceivedVotes() const { return _receivedVotes; } + + private: + // Tally of the number of received votes for this election. + int _receivedVotes; + + // Number of responses received so far. + size_t _actualResponses; + + bool _sufficientResponsesReceived; + + const ReplicaSetConfig _rsConfig; + const int _selfIndex; + const std::vector<HostAndPort> _targets; + const long long _round; + }; + ElectCmdRunner(); + ~ElectCmdRunner(); /** * Begins the process of sending replSetElect commands to all non-DOWN nodes * in currentConfig. - * evh can be used to schedule a callback when the process is complete. - * evh is guaranteed to be signaled if and only if this function returns Status::OK(). - **/ - Status start( + * + * Returned handle can be used to schedule a callback when the process is complete. + */ + StatusWith<ReplicationExecutor::EventHandle> start( ReplicationExecutor* executor, - const ReplicationExecutor::EventHandle& evh, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<HostAndPort>& hosts); + const std::vector<HostAndPort>& targets); /** * Returns the number of received votes. Only valid to call after - * the event handle supplied to start() has been signaled, which guarantees that - * _receivedVotes will no longer be touched by callbacks. + * the event handle returned from start() has been signaled, which guarantees that + * the vote count will no longer be touched by callbacks. */ int getReceivedVotes() const; private: - /** - * Callback that runs after a replSetElect command returns. - * Increments the _receivedVotes counter appropriately, and - * signals completion if we have received the last expected response. - */ - void _onReplSetElectResponse(const ReplicationExecutor::RemoteCommandCallbackData& cbData); - - /** - * Signals _sufficientResponsesReceived event, if it hasn't been already. - */ - void _signalSufficientResponsesReceived(ReplicationExecutor* executor); - - // Event used to signal completion of the ElectCmdRunner's commands. - ReplicationExecutor::EventHandle _sufficientResponsesReceived; - - // Vector of command callbacks scheduled by start(). - std::vector<ReplicationExecutor::CallbackHandle> _responseCallbacks; - - // Tally of the number of received votes for this election. - int _receivedVotes; - - // Number of responses received so far. - size_t _actualResponses; + boost::scoped_ptr<Algorithm> _algorithm; + boost::scoped_ptr<ScatterGatherRunner> _runner; }; } diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index a2bde9b3791..82ac70be3b1 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -49,26 +49,27 @@ namespace { class ElectCmdRunnerTest : public mongo::unittest::Test { public: - ElectCmdRunnerTest(); + void doTest(ElectCmdRunner* electCmdRunner, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts); + void electCmdRunnerRunner(const ReplicationExecutor::CallbackData& data, ElectCmdRunner* electCmdRunner, - const ReplicationExecutor::EventHandle& evh, + StatusWith<ReplicationExecutor::EventHandle>* evh, const ReplicaSetConfig& currentConfig, int selfIndex, const std::vector<HostAndPort>& hosts); - protected: + NetworkInterfaceMockWithMap* _net; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; - Status _lastStatus; private: void setUp(); void tearDown(); }; - ElectCmdRunnerTest::ElectCmdRunnerTest() : _lastStatus(Status::OK()) {} - void ElectCmdRunnerTest::setUp() { _net = new NetworkInterfaceMockWithMap; _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); @@ -103,72 +104,67 @@ namespace { // This is necessary because the run method must be scheduled in the Replication Executor // for correct concurrency operation. void ElectCmdRunnerTest::electCmdRunnerRunner( - const ReplicationExecutor::CallbackData& data, - ElectCmdRunner* electCmdRunner, - const ReplicationExecutor::EventHandle& evh, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts) { + const ReplicationExecutor::CallbackData& data, + ElectCmdRunner* electCmdRunner, + StatusWith<ReplicationExecutor::EventHandle>* evh, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts) { + invariant(data.status.isOK()); - _lastStatus = electCmdRunner->start(data.executor, - evh, - currentConfig, - selfIndex, - hosts); + *evh = electCmdRunner->start( + data.executor, + currentConfig, + selfIndex, + hosts); } - TEST_F(ElectCmdRunnerTest, OneNode) { - // Only one node in the config. - ReplicaSetConfig config = assertMakeRSConfig( - BSON("_id" << "rs0" << - "version" << 1 << - "members" << BSON_ARRAY( - BSON("_id" << 1 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - - Date_t now(0); - std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); + void ElectCmdRunnerTest::doTest(ElectCmdRunner* electCmdRunner, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts) { - ElectCmdRunner electCmdRunner; - - StatusWith<ReplicationExecutor::CallbackHandle> cbh = + StatusWith<ReplicationExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set"); + StatusWith<ReplicationExecutor::CallbackHandle> cbh = _executor->scheduleWork( stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, this, stdx::placeholders::_1, - &electCmdRunner, - evh.getValue(), - config, - 0, + electCmdRunner, + &evh, + currentConfig, + selfIndex, hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - + ASSERT_OK(evh.getStatus()); _executor->waitForEvent(evh.getValue()); + } + + TEST_F(ElectCmdRunnerTest, OneNode) { + // Only one node in the config. + const ReplicaSetConfig config = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1")))); - ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 1); + std::vector<HostAndPort> hosts; + ElectCmdRunner electCmdRunner; + doTest(&electCmdRunner, config, 0, hosts); + ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 1); } TEST_F(ElectCmdRunnerTest, TwoNodes) { // Two nodes, we are node h1. - ReplicaSetConfig config = assertMakeRSConfig( + const ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj electRequest = makeElectRequest(config, 0); @@ -181,24 +177,8 @@ namespace { "round" << 380865962699346850ll))); ElectCmdRunner electCmdRunner; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, - this, - stdx::placeholders::_1, - &electCmdRunner, - evh.getValue(), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + doTest(&electCmdRunner, config, 0, hosts); ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 2); - } @@ -210,13 +190,8 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj electRequest = makeElectRequest(config, 0); @@ -229,30 +204,26 @@ namespace { true /* isBlocked */); ElectCmdRunner electCmdRunner; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = + StatusWith<ReplicationExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set"); + StatusWith<ReplicationExecutor::CallbackHandle> cbh = _executor->scheduleWork( stdx::bind(&ElectCmdRunnerTest::electCmdRunnerRunner, this, stdx::placeholders::_1, &electCmdRunner, - evh.getValue(), + &evh, config, 0, hosts)); ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - ASSERT_OK(_lastStatus); - + ASSERT_OK(evh.getStatus()); _executor->shutdown(); _net->unblockAll(); _executor->waitForEvent(evh.getValue()); - ASSERT_EQUALS(electCmdRunner.getReceivedVotes(), 1); - } - } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp index 87924d29a15..9317dc8b289 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp @@ -169,15 +169,18 @@ namespace repl { return; } - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = cbData.executor->makeEvent(); - if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + _electCmdRunner.reset(new ElectCmdRunner); + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( + cbData.executor, + _rsConfig, + _thisMembersConfigIndex, + _topCoord->getMaybeUpHostAndPorts()); + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; - } + } fassert(18685, nextPhaseEvh.getStatus()); - - _electCmdRunner.reset(new ElectCmdRunner); - StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = + StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = cbData.executor->onEvent( nextPhaseEvh.getValue(), stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, @@ -189,16 +192,6 @@ namespace repl { } fassert(18671, finishCheckCallback.getStatus()); - Status electionCompleteStatus = _electCmdRunner->start(cbData.executor, - nextPhaseEvh.getValue(), - _rsConfig, - _thisMembersConfigIndex, - _topCoord->getMaybeUpHostAndPorts()); - if (electionCompleteStatus == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18686, electionCompleteStatus); - freshnessCheckerDeleter.Dismiss(); finishEvhGuard.Dismiss(); } @@ -222,11 +215,12 @@ namespace repl { int receivedVotes = _electCmdRunner->getReceivedVotes(); - if (receivedVotes * 2 <= _rsConfig.getMajorityVoteCount()) { - log() << "replSet couldn't elect self, only received " << receivedVotes << " votes"; + if (receivedVotes < _rsConfig.getMajorityVoteCount()) { + log() << "replSet couldn't elect self, only received " << receivedVotes << + " votes, but needed at least " << _rsConfig.getMajorityVoteCount(); return; } - + if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) { log() << "replSet config version changed during our election, ignoring result"; return; diff --git a/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp index 2ce982f3205..7f468b1a5e8 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp @@ -195,7 +195,7 @@ namespace { } TEST_F(ReplCoordElectTest, ElectNotEnoughVotes) { - // one responds with -10000 vote and we are not elected + // one responds with -10000 votes, and one doesn't respond, and we are not elected startCapturingLogMessages(); BSONObj configObj = BSON("_id" << "mySet" << "version" << 1 << @@ -219,17 +219,10 @@ namespace { "vote" << -10000 << "round" << kFirstRound))); - getNet()->addResponse(RemoteCommandRequest(HostAndPort("node3:12345"), - "admin", - electRequest), - StatusWith<BSONObj>(BSON("ok" << 1 << - "vote" << 1 << - "round" << kFirstRound))); - getReplCoord()->testElection(); stopCapturingLogMessages(); ASSERT_EQUALS(1, - countLogLinesContaining("replSet couldn't elect self, only received -9998 votes")); + countLogLinesContaining("replSet couldn't elect self, only received -9999 votes")); } TEST_F(ReplCoordElectTest, ElectWrongTypeForVote) { |