diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2014-09-12 18:27:38 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2014-09-15 19:08:14 -0400 |
commit | f082e861661501755b27eee6abf3a4e61e61fdc4 (patch) | |
tree | 6fb54bea6c1d4eadae487fcab16f1e3dd4c6f4f2 | |
parent | fac5571f67e23a1339e20e0621bab0ba2a6b7163 (diff) | |
download | mongo-f082e861661501755b27eee6abf3a4e61e61fdc4.tar.gz |
SERVER-15248 Convert QuorumChecker into a ScatterGatherAlgorithm.
-rw-r--r-- | src/mongo/db/repl/check_quorum_for_config_change.cpp | 310 | ||||
-rw-r--r-- | src/mongo/db/repl/check_quorum_for_config_change_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_test.cpp | 16 |
3 files changed, 119 insertions, 223 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index 6f707524493..d2ea0b2c497 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -32,8 +32,10 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" -#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/scatter_gather_algorithm.h" +#include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -45,35 +47,12 @@ namespace { * * Usage: Construct a QuorumChecker, pass in a pointer to the configuration for which you're * checking quorum, and the integer index of the member config representing the "executing" - * node. Call "run", pass in an executor, and the returned status is the result of the quorum - * check. - * - * Theory of operation: - * - * The QuorumChecker executes three kinds of callback in the executor's run loop. - * - * - _startQuorumCheck schedules heartbeats to all nodes except the local one, and initializes - * the checker's response tabulation with information about the local node. - * - * - _onQuorumCheckHeartbeatResponse updates the response tabulation based on a response - * or timeout from a remote node. - * - * - _onQuorumCheckComplete uses information in the tabulation to compute the final status - * of the quorum check, sometime after _haveReceivedSufficientReplies becomes true. - * - * The thread executing QuorumChecker::run first performs some initial set up, required for it - * to synchronize effectively with the thread running executor's event loop. It creates the - * event that is used to trigger _onQuorumCheckComplete, and schedules that callback and the - * _startQuorumCheck callbacks. It then waits for _onQuorumCheckComplete to return, at which - * point it knows that _finalStatus is set to the proper response, and so can return. - * - * Some scheduled _onQuorumCheckHeartbeatResponse callbacks may not have executed before - * QuorumChecker::run wakes up from waiting for the _onQuorumCheckComplete callback. Before - * returning, QuorumChecker::run marks all callbacks it scheduled for cancelation, ensuring that - * even if they run they will not access member variables of QuorumChecker. This makes it safe - * to let QuorumChecker go out of scope as soon as QuorumChecker::run returns. + * node. Use ScatterGatherRunner or otherwise execute a scatter-gather procedure as desribed in + * the class comment for the ScatterGatherAlgorithm class. After + * hasReceivedSufficientResponses() returns true, you may call getFinalStatus() to get the + * result of the quorum check. */ - class QuorumChecker { + class QuorumChecker : public ScatterGatherAlgorithm { MONGO_DISALLOW_COPYING(QuorumChecker); public: /** @@ -83,168 +62,102 @@ namespace { * "rsConfig" must stay in scope until QuorumChecker's destructor completes. */ QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex); + virtual ~QuorumChecker(); - /** - * Executes the quorum check using "executor" for network and event loop operations. - * - * Returns Status::OK() if a quorum responded, or an error status otherwise. - */ - Status run(ReplicationExecutor* executor); + virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual void processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response); - private: - /** - * Initial callback run in the event loop, that schedules remote heartbeats and initializes - * QuorumChecker state based on the member config of the current node. - */ - void _startQuorumCheck(const ReplicationExecutor::CallbackData& cbData); + virtual bool hasReceivedSufficientResponses() const; - /** - * Callback that processes a single heartbeat response. - */ - void _onQuorumCheckHeartbeatResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData, - const int memberIndex); + Status getFinalStatus() const { return _finalStatus; } + private: /** * Callback that executes after _haveReceivedSufficientReplies() becomes true. * * Computes the quorum result based on responses received so far, stores it into * _finalStatus, and enables QuorumChecker::run() to return. */ - void _onQuorumCheckComplete(const ReplicationExecutor::CallbackData& cbData); + void _onQuorumCheckComplete(); /** * Updates the QuorumChecker state based on the data from a single heartbeat response. - * - * Updates state labeled (X) in the concurrency legend, below, and so only safe - * to call from within executor thread callbacks. */ void _tabulateHeartbeatResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData, - const int memberIndex); - - /** - * Returns true if we've received enough responses to conclude the quorum check. - * - * Reads state labeled (X) in the concurrency legend, below, and so only safe - * to call from within executor thread callbacks. - */ - bool _haveReceivedSufficientReplies() const; - - /** - * Signals the event that indicates that _haveReceivedSufficientReplies() is now true, - * unless that event has already been signaled. - * - * Called from executor callbacks or the thread executing QuorumChecker::run. - */ - void _signalSufficientResponsesReceived(ReplicationExecutor* executor); - - // Concurrency legend: - // (R) Read-only during concurrent operation (after run() calls a schedule method). - // (X) Only read and modified by executor callbacks. - // (C) Written by the "startup" or "completion" callback, only, and read by client threads - // that have waited for the "completion" callback to complete + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response); // Pointer to the replica set configuration for which we're checking quorum. - const ReplicaSetConfig* const _rsConfig; // (R) + const ReplicaSetConfig* const _rsConfig; // Index of the local node's member configuration in _rsConfig. - const int _myIndex; // (R) - - // Event signaled when _haveReceivedSufficientReplies() becomes true. - ReplicationExecutor::EventHandle _sufficientResponsesReceived; // (R) + const int _myIndex; // List of nodes believed to be down. - std::vector<HostAndPort> _down; // (X) + std::vector<HostAndPort> _down; // List of voting nodes that have responded affirmatively. - std::vector<HostAndPort> _voters; // (X) + std::vector<HostAndPort> _voters; // Total number of responses and timeouts processed. - int _numResponses; // (X) + int _numResponses; // Number of electable nodes that have responded affirmatively. - int _numElectable; // (X) + int _numElectable; // Set to a non-OK status if a response from a remote node indicates // that the quorum check should definitely fail, such as because of // a replica set name mismatch. - Status _vetoStatus; // (X) - - // List of heartbeat callbacks scheduled by _startQuorumCheck and - // canceled by _onQuorumCheckComplete. - std::vector<ReplicationExecutor::CallbackHandle> _hbResponseCallbacks; // (X) + Status _vetoStatus; // Final status of the quorum check, returned by run(). - Status _finalStatus; // (C) + Status _finalStatus; }; QuorumChecker::QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex) : _rsConfig(rsConfig), _myIndex(myIndex), - _numResponses(0), + _numResponses(1), // We "responded" to ourself already. _numElectable(0), _vetoStatus(Status::OK()), _finalStatus(ErrorCodes::CallbackCanceled, "Quorum check canceled") { invariant(myIndex < _rsConfig->getNumMembers()); - } + const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex); - Status QuorumChecker::run(ReplicationExecutor* executor) { - StatusWith<ReplicationExecutor::EventHandle> evh = executor->makeEvent(); - if (!evh.isOK()) { - return evh.getStatus(); + if (myConfig.isVoter()) { + _voters.push_back(myConfig.getHostAndPort()); } - _sufficientResponsesReceived = evh.getValue(); - - StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = executor->onEvent( - _sufficientResponsesReceived, - stdx::bind(&QuorumChecker::_onQuorumCheckComplete, this, stdx::placeholders::_1)); - if (!finishCheckCallback.isOK()) { - _signalSufficientResponsesReceived(executor); // Clean up. - return finishCheckCallback.getStatus(); + if (myConfig.isElectable()) { + _numElectable = 1; } - StatusWith<ReplicationExecutor::CallbackHandle> startCheckCallback = executor->scheduleWork( - stdx::bind(&QuorumChecker::_startQuorumCheck, this, stdx::placeholders::_1)); - if (!startCheckCallback.isOK()) { - _signalSufficientResponsesReceived(executor); // Clean up. - executor->cancel(finishCheckCallback.getValue()); // Clean up. - return startCheckCallback.getStatus(); + if (hasReceivedSufficientResponses()) { + _onQuorumCheckComplete(); } - - executor->wait(finishCheckCallback.getValue()); - - return _finalStatus; } - void QuorumChecker::_startQuorumCheck(const ReplicationExecutor::CallbackData& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } + QuorumChecker::~QuorumChecker() {} + + std::vector<ReplicationExecutor::RemoteCommandRequest> QuorumChecker::getRequests() const { const bool isInitialConfig = _rsConfig->getConfigVersion() == 1; const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex); - if (myConfig.isVoter()) { - _voters.push_back(myConfig.getHostAndPort()); - } - if (myConfig.isElectable()) { - _numElectable = 1; + std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + if (hasReceivedSufficientResponses()) { + return requests; } - _numResponses = 1; // We "responded" to ourself already. - if (_haveReceivedSufficientReplies()) { - _signalSufficientResponsesReceived(cbData.executor); - } - - // TODO: Call a helper to make the request object. - const BSONObj hbRequest = BSON( - "replSetHeartbeat" << _rsConfig->getReplSetName() << - "v" << _rsConfig->getConfigVersion() << - "pv" << 1 << - "checkEmpty" << isInitialConfig << - "from" << myConfig.getHostAndPort().toString() << - "fromId" << myConfig.getId()); + ReplSetHeartbeatArgs hbArgs; + hbArgs.setSetName(_rsConfig->getReplSetName()); + hbArgs.setProtocolVersion(1); + hbArgs.setConfigVersion(_rsConfig->getConfigVersion()); + hbArgs.setCheckEmpty(isInitialConfig); + hbArgs.setSenderHost(myConfig.getHostAndPort()); + hbArgs.setSenderId(myConfig.getId()); + const BSONObj hbRequest = hbArgs.toBSON(); // Send a bunch of heartbeat requests. // Schedule an operation when a "sufficient" number of them have completed, and use that @@ -255,60 +168,27 @@ namespace { // No need to check self for liveness or unreadiness. continue; } - const StatusWith<ReplicationExecutor::CallbackHandle> cbh = - cbData.executor->scheduleRemoteCommand( - ReplicationExecutor::RemoteCommandRequest( - _rsConfig->getMemberAt(i).getHostAndPort(), - "admin", - hbRequest, - _rsConfig->getHeartbeatTimeoutPeriodMillis()), - stdx::bind(&QuorumChecker::_onQuorumCheckHeartbeatResponse, - this, - stdx::placeholders::_1, - i)); - if (!cbh.isOK()) { - _vetoStatus = cbh.getStatus(); - _signalSufficientResponsesReceived(cbData.executor); - return; - } - _hbResponseCallbacks.push_back(cbh.getValue()); + requests.push_back(ReplicationExecutor::RemoteCommandRequest( + _rsConfig->getMemberAt(i).getHostAndPort(), + "admin", + hbRequest, + _rsConfig->getHeartbeatTimeoutPeriodMillis())); } - } - - void QuorumChecker::_onQuorumCheckHeartbeatResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData, - const int memberIndex) { - if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { - // If this callback has been canceled, it's not safe to look at *this. However, - // QuorumChecker::run has already returned or will without the information we - // can provide here. - return; - } + return requests; + } - _tabulateHeartbeatResponse(cbData, memberIndex); + void QuorumChecker::processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response) { - if (_haveReceivedSufficientReplies()) { - _signalSufficientResponsesReceived(cbData.executor); + _tabulateHeartbeatResponse(request, response); + if (hasReceivedSufficientResponses()) { + _onQuorumCheckComplete(); } } - void QuorumChecker::_onQuorumCheckComplete(const ReplicationExecutor::CallbackData& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - // If this callback has been canceled, it's not safe to look at *this. However, - // QuorumChecker::run has already returned or will without the information we - // can provide here. - return; - } - - // Cancel all the heartbeat callbacks, so that they do not attempt to access QuorumChecker - // state after this callback completes. - std::for_each(_hbResponseCallbacks.begin(), - _hbResponseCallbacks.end(), - stdx::bind(&ReplicationExecutor::cancel, - cbData.executor, - stdx::placeholders::_1)); - + void QuorumChecker::_onQuorumCheckComplete() { if (!_vetoStatus.isOK()) { _finalStatus = _vetoStatus; return; @@ -351,20 +231,20 @@ namespace { } void QuorumChecker::_tabulateHeartbeatResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData, - const int memberIndex) { + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response) { ++_numResponses; - if (!cbData.response.isOK()) { - warning() << "Failed to complete heartbeat request to " << cbData.request.target << - "; " << cbData.response.getStatus(); - _down.push_back(cbData.request.target); + if (!response.isOK()) { + warning() << "Failed to complete heartbeat request to " << request.target << + "; " << response.getStatus(); + _down.push_back(request.target); return; } - BSONObj res = cbData.response.getValue().data; + BSONObj res = response.getValue().data; if (res["mismatch"].trueValue()) { std::string message = str::stream() << "Our set name did not match that of " << - cbData.request.target.toString(); + request.target.toString(); _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); warning() << message; return; @@ -373,7 +253,7 @@ namespace { if (res["v"].numberInt() >= _rsConfig->getConfigVersion()) { std::string message = str::stream() << "Our config version of " << _rsConfig->getConfigVersion() << - " is no larger than the version on " << cbData.request.target.toString() << + " is no larger than the version on " << request.target.toString() << ", which is " << res["v"].toString(); _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); warning() << message; @@ -381,22 +261,29 @@ namespace { } } if (!res["ok"].trueValue()) { - warning() << "Got error response on heartbeat request to " << cbData.request.target << + warning() << "Got error response on heartbeat request to " << request.target << "; " << res; - _down.push_back(_rsConfig->getMemberAt(memberIndex).getHostAndPort()); + _down.push_back(request.target); return; } - const MemberConfig& memberConfig = _rsConfig->getMemberAt(memberIndex); - if (memberConfig.isElectable()) { - ++_numElectable; - } - if (memberConfig.isVoter()) { - _voters.push_back(cbData.request.target); + for (int i = 0; i < _rsConfig->getNumMembers(); ++i) { + const MemberConfig& memberConfig = _rsConfig->getMemberAt(i); + if (memberConfig.getHostAndPort() != request.target) { + continue; + } + if (memberConfig.isElectable()) { + ++_numElectable; + } + if (memberConfig.isVoter()) { + _voters.push_back(request.target); + } + return; } + invariant(false); } - bool QuorumChecker::_haveReceivedSufficientReplies() const { + bool QuorumChecker::hasReceivedSufficientResponses() const { if (!_vetoStatus.isOK() || _numResponses == _rsConfig->getNumMembers()) { // Vetoed or everybody has responded. All done. return true; @@ -419,29 +306,32 @@ namespace { return true; } - void QuorumChecker::_signalSufficientResponsesReceived(ReplicationExecutor* executor) { - if (_sufficientResponsesReceived.isValid()) { - executor->signalEvent(_sufficientResponsesReceived); - _sufficientResponsesReceived = ReplicationExecutor::EventHandle(); + Status checkQuorumGeneral(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + const int myIndex) { + QuorumChecker checker(&rsConfig, myIndex); + ScatterGatherRunner runner(&checker); + Status status = runner.run(executor); + if (!status.isOK()) { + return status; } - } + return checker.getFinalStatus(); + } } // namespace Status checkQuorumForInitiate(ReplicationExecutor* executor, const ReplicaSetConfig& rsConfig, const int myIndex) { invariant(rsConfig.getConfigVersion() == 1); - QuorumChecker checker(&rsConfig, myIndex); - return checker.run(executor); + return checkQuorumGeneral(executor, rsConfig, myIndex); } Status checkQuorumForReconfig(ReplicationExecutor* executor, const ReplicaSetConfig& rsConfig, const int myIndex) { invariant(rsConfig.getConfigVersion() > 1); - QuorumChecker checker(&rsConfig, myIndex); - return checker.run(executor); + return checkQuorumGeneral(executor, rsConfig, myIndex); } } // namespace repl diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index 802f569661d..be1b962c7ae 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/check_quorum_for_config_change.h" #include "mongo/db/repl/network_interface_mock.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" @@ -138,13 +139,14 @@ namespace { const BSONObj makeHeartbeatRequest(const ReplicaSetConfig& rsConfig, int myConfigIndex) { const MemberConfig& myConfig = rsConfig.getMemberAt(myConfigIndex); - return BSON( - "replSetHeartbeat" << rsConfig.getReplSetName() << - "v" << rsConfig.getConfigVersion() << - "pv" << 1 << - "checkEmpty" << (rsConfig.getConfigVersion() == 1) << - "from" << myConfig.getHostAndPort().toString() << - "fromId" << myConfig.getId()); + ReplSetHeartbeatArgs hbArgs; + hbArgs.setSetName(rsConfig.getReplSetName()); + hbArgs.setProtocolVersion(1); + hbArgs.setConfigVersion(rsConfig.getConfigVersion()); + hbArgs.setCheckEmpty(rsConfig.getConfigVersion() == 1); + hbArgs.setSenderHost(myConfig.getHostAndPort()); + hbArgs.setSenderId(myConfig.getId()); + return hbArgs.toBSON(); } TEST_F(CheckQuorumForInitiate, QuorumCheckSuccessForFiveNodes) { diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp index c04b8637707..3966e513caf 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/repl_coordinator_external_state_mock.h" #include "mongo/db/repl/repl_coordinator_impl.h" #include "mongo/db/repl/repl_coordinator_test_fixture.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/topology_coordinator_impl.h" @@ -220,16 +221,19 @@ namespace { BSON("_id" << 1 << "host" << "node2:54321"))), &result1)); ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode()); + + ReplSetHeartbeatArgs hbArgs; + hbArgs.setSetName("mySet"); + hbArgs.setProtocolVersion(1); + hbArgs.setConfigVersion(1); + hbArgs.setCheckEmpty(true); + hbArgs.setSenderHost(HostAndPort("node1", 12345)); + hbArgs.setSenderId(0); getNetWithMap()->addResponse( ReplicationExecutor::RemoteCommandRequest( HostAndPort("node2", 54321), "admin", - BSON("replSetHeartbeat" << "mySet" << - "v" << 1 << - "pv" << 1 << - "checkEmpty" << true << - "from" << "node1:12345" << - "fromId" << 0)), + hbArgs.toBSON()), StatusWith<BSONObj>(BSON("ok" << 1))); ASSERT_OK( |