summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2014-09-12 18:27:38 -0400
committerAndy Schwerin <schwerin@mongodb.com>2014-09-15 19:08:14 -0400
commitf082e861661501755b27eee6abf3a4e61e61fdc4 (patch)
tree6fb54bea6c1d4eadae487fcab16f1e3dd4c6f4f2
parentfac5571f67e23a1339e20e0621bab0ba2a6b7163 (diff)
downloadmongo-f082e861661501755b27eee6abf3a4e61e61fdc4.tar.gz
SERVER-15248 Convert QuorumChecker into a ScatterGatherAlgorithm.
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp310
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change_test.cpp16
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_test.cpp16
3 files changed, 119 insertions, 223 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index 6f707524493..d2ea0b2c497 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -32,8 +32,10 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
-#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/replica_set_config.h"
+#include "mongo/db/repl/scatter_gather_algorithm.h"
+#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -45,35 +47,12 @@ namespace {
*
* Usage: Construct a QuorumChecker, pass in a pointer to the configuration for which you're
* checking quorum, and the integer index of the member config representing the "executing"
- * node. Call "run", pass in an executor, and the returned status is the result of the quorum
- * check.
- *
- * Theory of operation:
- *
- * The QuorumChecker executes three kinds of callback in the executor's run loop.
- *
- * - _startQuorumCheck schedules heartbeats to all nodes except the local one, and initializes
- * the checker's response tabulation with information about the local node.
- *
- * - _onQuorumCheckHeartbeatResponse updates the response tabulation based on a response
- * or timeout from a remote node.
- *
- * - _onQuorumCheckComplete uses information in the tabulation to compute the final status
- * of the quorum check, sometime after _haveReceivedSufficientReplies becomes true.
- *
- * The thread executing QuorumChecker::run first performs some initial set up, required for it
- * to synchronize effectively with the thread running executor's event loop. It creates the
- * event that is used to trigger _onQuorumCheckComplete, and schedules that callback and the
- * _startQuorumCheck callbacks. It then waits for _onQuorumCheckComplete to return, at which
- * point it knows that _finalStatus is set to the proper response, and so can return.
- *
- * Some scheduled _onQuorumCheckHeartbeatResponse callbacks may not have executed before
- * QuorumChecker::run wakes up from waiting for the _onQuorumCheckComplete callback. Before
- * returning, QuorumChecker::run marks all callbacks it scheduled for cancelation, ensuring that
- * even if they run they will not access member variables of QuorumChecker. This makes it safe
- * to let QuorumChecker go out of scope as soon as QuorumChecker::run returns.
+ * node. Use ScatterGatherRunner or otherwise execute a scatter-gather procedure as desribed in
+ * the class comment for the ScatterGatherAlgorithm class. After
+ * hasReceivedSufficientResponses() returns true, you may call getFinalStatus() to get the
+ * result of the quorum check.
*/
- class QuorumChecker {
+ class QuorumChecker : public ScatterGatherAlgorithm {
MONGO_DISALLOW_COPYING(QuorumChecker);
public:
/**
@@ -83,168 +62,102 @@ namespace {
* "rsConfig" must stay in scope until QuorumChecker's destructor completes.
*/
QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex);
+ virtual ~QuorumChecker();
- /**
- * Executes the quorum check using "executor" for network and event loop operations.
- *
- * Returns Status::OK() if a quorum responded, or an error status otherwise.
- */
- Status run(ReplicationExecutor* executor);
+ virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const;
+ virtual void processResponse(
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response);
- private:
- /**
- * Initial callback run in the event loop, that schedules remote heartbeats and initializes
- * QuorumChecker state based on the member config of the current node.
- */
- void _startQuorumCheck(const ReplicationExecutor::CallbackData& cbData);
+ virtual bool hasReceivedSufficientResponses() const;
- /**
- * Callback that processes a single heartbeat response.
- */
- void _onQuorumCheckHeartbeatResponse(
- const ReplicationExecutor::RemoteCommandCallbackData& cbData,
- const int memberIndex);
+ Status getFinalStatus() const { return _finalStatus; }
+ private:
/**
* Callback that executes after _haveReceivedSufficientReplies() becomes true.
*
* Computes the quorum result based on responses received so far, stores it into
* _finalStatus, and enables QuorumChecker::run() to return.
*/
- void _onQuorumCheckComplete(const ReplicationExecutor::CallbackData& cbData);
+ void _onQuorumCheckComplete();
/**
* Updates the QuorumChecker state based on the data from a single heartbeat response.
- *
- * Updates state labeled (X) in the concurrency legend, below, and so only safe
- * to call from within executor thread callbacks.
*/
void _tabulateHeartbeatResponse(
- const ReplicationExecutor::RemoteCommandCallbackData& cbData,
- const int memberIndex);
-
- /**
- * Returns true if we've received enough responses to conclude the quorum check.
- *
- * Reads state labeled (X) in the concurrency legend, below, and so only safe
- * to call from within executor thread callbacks.
- */
- bool _haveReceivedSufficientReplies() const;
-
- /**
- * Signals the event that indicates that _haveReceivedSufficientReplies() is now true,
- * unless that event has already been signaled.
- *
- * Called from executor callbacks or the thread executing QuorumChecker::run.
- */
- void _signalSufficientResponsesReceived(ReplicationExecutor* executor);
-
- // Concurrency legend:
- // (R) Read-only during concurrent operation (after run() calls a schedule method).
- // (X) Only read and modified by executor callbacks.
- // (C) Written by the "startup" or "completion" callback, only, and read by client threads
- // that have waited for the "completion" callback to complete
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response);
// Pointer to the replica set configuration for which we're checking quorum.
- const ReplicaSetConfig* const _rsConfig; // (R)
+ const ReplicaSetConfig* const _rsConfig;
// Index of the local node's member configuration in _rsConfig.
- const int _myIndex; // (R)
-
- // Event signaled when _haveReceivedSufficientReplies() becomes true.
- ReplicationExecutor::EventHandle _sufficientResponsesReceived; // (R)
+ const int _myIndex;
// List of nodes believed to be down.
- std::vector<HostAndPort> _down; // (X)
+ std::vector<HostAndPort> _down;
// List of voting nodes that have responded affirmatively.
- std::vector<HostAndPort> _voters; // (X)
+ std::vector<HostAndPort> _voters;
// Total number of responses and timeouts processed.
- int _numResponses; // (X)
+ int _numResponses;
// Number of electable nodes that have responded affirmatively.
- int _numElectable; // (X)
+ int _numElectable;
// Set to a non-OK status if a response from a remote node indicates
// that the quorum check should definitely fail, such as because of
// a replica set name mismatch.
- Status _vetoStatus; // (X)
-
- // List of heartbeat callbacks scheduled by _startQuorumCheck and
- // canceled by _onQuorumCheckComplete.
- std::vector<ReplicationExecutor::CallbackHandle> _hbResponseCallbacks; // (X)
+ Status _vetoStatus;
// Final status of the quorum check, returned by run().
- Status _finalStatus; // (C)
+ Status _finalStatus;
};
QuorumChecker::QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex)
: _rsConfig(rsConfig),
_myIndex(myIndex),
- _numResponses(0),
+ _numResponses(1), // We "responded" to ourself already.
_numElectable(0),
_vetoStatus(Status::OK()),
_finalStatus(ErrorCodes::CallbackCanceled, "Quorum check canceled") {
invariant(myIndex < _rsConfig->getNumMembers());
- }
+ const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex);
- Status QuorumChecker::run(ReplicationExecutor* executor) {
- StatusWith<ReplicationExecutor::EventHandle> evh = executor->makeEvent();
- if (!evh.isOK()) {
- return evh.getStatus();
+ if (myConfig.isVoter()) {
+ _voters.push_back(myConfig.getHostAndPort());
}
- _sufficientResponsesReceived = evh.getValue();
-
- StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = executor->onEvent(
- _sufficientResponsesReceived,
- stdx::bind(&QuorumChecker::_onQuorumCheckComplete, this, stdx::placeholders::_1));
- if (!finishCheckCallback.isOK()) {
- _signalSufficientResponsesReceived(executor); // Clean up.
- return finishCheckCallback.getStatus();
+ if (myConfig.isElectable()) {
+ _numElectable = 1;
}
- StatusWith<ReplicationExecutor::CallbackHandle> startCheckCallback = executor->scheduleWork(
- stdx::bind(&QuorumChecker::_startQuorumCheck, this, stdx::placeholders::_1));
- if (!startCheckCallback.isOK()) {
- _signalSufficientResponsesReceived(executor); // Clean up.
- executor->cancel(finishCheckCallback.getValue()); // Clean up.
- return startCheckCallback.getStatus();
+ if (hasReceivedSufficientResponses()) {
+ _onQuorumCheckComplete();
}
-
- executor->wait(finishCheckCallback.getValue());
-
- return _finalStatus;
}
- void QuorumChecker::_startQuorumCheck(const ReplicationExecutor::CallbackData& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
+ QuorumChecker::~QuorumChecker() {}
+
+ std::vector<ReplicationExecutor::RemoteCommandRequest> QuorumChecker::getRequests() const {
const bool isInitialConfig = _rsConfig->getConfigVersion() == 1;
const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex);
- if (myConfig.isVoter()) {
- _voters.push_back(myConfig.getHostAndPort());
- }
- if (myConfig.isElectable()) {
- _numElectable = 1;
+ std::vector<ReplicationExecutor::RemoteCommandRequest> requests;
+ if (hasReceivedSufficientResponses()) {
+ return requests;
}
- _numResponses = 1; // We "responded" to ourself already.
- if (_haveReceivedSufficientReplies()) {
- _signalSufficientResponsesReceived(cbData.executor);
- }
-
- // TODO: Call a helper to make the request object.
- const BSONObj hbRequest = BSON(
- "replSetHeartbeat" << _rsConfig->getReplSetName() <<
- "v" << _rsConfig->getConfigVersion() <<
- "pv" << 1 <<
- "checkEmpty" << isInitialConfig <<
- "from" << myConfig.getHostAndPort().toString() <<
- "fromId" << myConfig.getId());
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setSetName(_rsConfig->getReplSetName());
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setConfigVersion(_rsConfig->getConfigVersion());
+ hbArgs.setCheckEmpty(isInitialConfig);
+ hbArgs.setSenderHost(myConfig.getHostAndPort());
+ hbArgs.setSenderId(myConfig.getId());
+ const BSONObj hbRequest = hbArgs.toBSON();
// Send a bunch of heartbeat requests.
// Schedule an operation when a "sufficient" number of them have completed, and use that
@@ -255,60 +168,27 @@ namespace {
// No need to check self for liveness or unreadiness.
continue;
}
- const StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- cbData.executor->scheduleRemoteCommand(
- ReplicationExecutor::RemoteCommandRequest(
- _rsConfig->getMemberAt(i).getHostAndPort(),
- "admin",
- hbRequest,
- _rsConfig->getHeartbeatTimeoutPeriodMillis()),
- stdx::bind(&QuorumChecker::_onQuorumCheckHeartbeatResponse,
- this,
- stdx::placeholders::_1,
- i));
- if (!cbh.isOK()) {
- _vetoStatus = cbh.getStatus();
- _signalSufficientResponsesReceived(cbData.executor);
- return;
- }
- _hbResponseCallbacks.push_back(cbh.getValue());
+ requests.push_back(ReplicationExecutor::RemoteCommandRequest(
+ _rsConfig->getMemberAt(i).getHostAndPort(),
+ "admin",
+ hbRequest,
+ _rsConfig->getHeartbeatTimeoutPeriodMillis()));
}
- }
-
- void QuorumChecker::_onQuorumCheckHeartbeatResponse(
- const ReplicationExecutor::RemoteCommandCallbackData& cbData,
- const int memberIndex) {
- if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
- // If this callback has been canceled, it's not safe to look at *this. However,
- // QuorumChecker::run has already returned or will without the information we
- // can provide here.
- return;
- }
+ return requests;
+ }
- _tabulateHeartbeatResponse(cbData, memberIndex);
+ void QuorumChecker::processResponse(
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response) {
- if (_haveReceivedSufficientReplies()) {
- _signalSufficientResponsesReceived(cbData.executor);
+ _tabulateHeartbeatResponse(request, response);
+ if (hasReceivedSufficientResponses()) {
+ _onQuorumCheckComplete();
}
}
- void QuorumChecker::_onQuorumCheckComplete(const ReplicationExecutor::CallbackData& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- // If this callback has been canceled, it's not safe to look at *this. However,
- // QuorumChecker::run has already returned or will without the information we
- // can provide here.
- return;
- }
-
- // Cancel all the heartbeat callbacks, so that they do not attempt to access QuorumChecker
- // state after this callback completes.
- std::for_each(_hbResponseCallbacks.begin(),
- _hbResponseCallbacks.end(),
- stdx::bind(&ReplicationExecutor::cancel,
- cbData.executor,
- stdx::placeholders::_1));
-
+ void QuorumChecker::_onQuorumCheckComplete() {
if (!_vetoStatus.isOK()) {
_finalStatus = _vetoStatus;
return;
@@ -351,20 +231,20 @@ namespace {
}
void QuorumChecker::_tabulateHeartbeatResponse(
- const ReplicationExecutor::RemoteCommandCallbackData& cbData,
- const int memberIndex) {
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response) {
++_numResponses;
- if (!cbData.response.isOK()) {
- warning() << "Failed to complete heartbeat request to " << cbData.request.target <<
- "; " << cbData.response.getStatus();
- _down.push_back(cbData.request.target);
+ if (!response.isOK()) {
+ warning() << "Failed to complete heartbeat request to " << request.target <<
+ "; " << response.getStatus();
+ _down.push_back(request.target);
return;
}
- BSONObj res = cbData.response.getValue().data;
+ BSONObj res = response.getValue().data;
if (res["mismatch"].trueValue()) {
std::string message = str::stream() << "Our set name did not match that of " <<
- cbData.request.target.toString();
+ request.target.toString();
_vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message);
warning() << message;
return;
@@ -373,7 +253,7 @@ namespace {
if (res["v"].numberInt() >= _rsConfig->getConfigVersion()) {
std::string message = str::stream() << "Our config version of " <<
_rsConfig->getConfigVersion() <<
- " is no larger than the version on " << cbData.request.target.toString() <<
+ " is no larger than the version on " << request.target.toString() <<
", which is " << res["v"].toString();
_vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message);
warning() << message;
@@ -381,22 +261,29 @@ namespace {
}
}
if (!res["ok"].trueValue()) {
- warning() << "Got error response on heartbeat request to " << cbData.request.target <<
+ warning() << "Got error response on heartbeat request to " << request.target <<
"; " << res;
- _down.push_back(_rsConfig->getMemberAt(memberIndex).getHostAndPort());
+ _down.push_back(request.target);
return;
}
- const MemberConfig& memberConfig = _rsConfig->getMemberAt(memberIndex);
- if (memberConfig.isElectable()) {
- ++_numElectable;
- }
- if (memberConfig.isVoter()) {
- _voters.push_back(cbData.request.target);
+ for (int i = 0; i < _rsConfig->getNumMembers(); ++i) {
+ const MemberConfig& memberConfig = _rsConfig->getMemberAt(i);
+ if (memberConfig.getHostAndPort() != request.target) {
+ continue;
+ }
+ if (memberConfig.isElectable()) {
+ ++_numElectable;
+ }
+ if (memberConfig.isVoter()) {
+ _voters.push_back(request.target);
+ }
+ return;
}
+ invariant(false);
}
- bool QuorumChecker::_haveReceivedSufficientReplies() const {
+ bool QuorumChecker::hasReceivedSufficientResponses() const {
if (!_vetoStatus.isOK() || _numResponses == _rsConfig->getNumMembers()) {
// Vetoed or everybody has responded. All done.
return true;
@@ -419,29 +306,32 @@ namespace {
return true;
}
- void QuorumChecker::_signalSufficientResponsesReceived(ReplicationExecutor* executor) {
- if (_sufficientResponsesReceived.isValid()) {
- executor->signalEvent(_sufficientResponsesReceived);
- _sufficientResponsesReceived = ReplicationExecutor::EventHandle();
+ Status checkQuorumGeneral(ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ const int myIndex) {
+ QuorumChecker checker(&rsConfig, myIndex);
+ ScatterGatherRunner runner(&checker);
+ Status status = runner.run(executor);
+ if (!status.isOK()) {
+ return status;
}
- }
+ return checker.getFinalStatus();
+ }
} // namespace
Status checkQuorumForInitiate(ReplicationExecutor* executor,
const ReplicaSetConfig& rsConfig,
const int myIndex) {
invariant(rsConfig.getConfigVersion() == 1);
- QuorumChecker checker(&rsConfig, myIndex);
- return checker.run(executor);
+ return checkQuorumGeneral(executor, rsConfig, myIndex);
}
Status checkQuorumForReconfig(ReplicationExecutor* executor,
const ReplicaSetConfig& rsConfig,
const int myIndex) {
invariant(rsConfig.getConfigVersion() > 1);
- QuorumChecker checker(&rsConfig, myIndex);
- return checker.run(executor);
+ return checkQuorumGeneral(executor, rsConfig, myIndex);
}
} // namespace repl
diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
index 802f569661d..be1b962c7ae 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/check_quorum_for_config_change.h"
#include "mongo/db/repl/network_interface_mock.h"
+#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
@@ -138,13 +139,14 @@ namespace {
const BSONObj makeHeartbeatRequest(const ReplicaSetConfig& rsConfig, int myConfigIndex) {
const MemberConfig& myConfig = rsConfig.getMemberAt(myConfigIndex);
- return BSON(
- "replSetHeartbeat" << rsConfig.getReplSetName() <<
- "v" << rsConfig.getConfigVersion() <<
- "pv" << 1 <<
- "checkEmpty" << (rsConfig.getConfigVersion() == 1) <<
- "from" << myConfig.getHostAndPort().toString() <<
- "fromId" << myConfig.getId());
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setSetName(rsConfig.getReplSetName());
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setConfigVersion(rsConfig.getConfigVersion());
+ hbArgs.setCheckEmpty(rsConfig.getConfigVersion() == 1);
+ hbArgs.setSenderHost(myConfig.getHostAndPort());
+ hbArgs.setSenderId(myConfig.getId());
+ return hbArgs.toBSON();
}
TEST_F(CheckQuorumForInitiate, QuorumCheckSuccessForFiveNodes) {
diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
index c04b8637707..3966e513caf 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/repl/repl_coordinator_external_state_mock.h"
#include "mongo/db/repl/repl_coordinator_impl.h"
#include "mongo/db/repl/repl_coordinator_test_fixture.h"
+#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
@@ -220,16 +221,19 @@ namespace {
BSON("_id" << 1 << "host" << "node2:54321"))),
&result1));
ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode());
+
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setSetName("mySet");
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setConfigVersion(1);
+ hbArgs.setCheckEmpty(true);
+ hbArgs.setSenderHost(HostAndPort("node1", 12345));
+ hbArgs.setSenderId(0);
getNetWithMap()->addResponse(
ReplicationExecutor::RemoteCommandRequest(
HostAndPort("node2", 54321),
"admin",
- BSON("replSetHeartbeat" << "mySet" <<
- "v" << 1 <<
- "pv" << 1 <<
- "checkEmpty" << true <<
- "from" << "node1:12345" <<
- "fromId" << 0)),
+ hbArgs.toBSON()),
StatusWith<BSONObj>(BSON("ok" << 1)));
ASSERT_OK(