diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2014-08-06 13:54:00 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2014-08-06 13:54:59 -0400 |
commit | 5aa26641648876ee10dcfc8ff7256dd46efc8b9c (patch) | |
tree | 434c4a367628fe2ce236221d9a37e85ffe8c7d4f /src/mongo | |
parent | b6f7a03697f160c965b467e8962f7f29934935c4 (diff) | |
download | mongo-5aa26641648876ee10dcfc8ff7256dd46efc8b9c.tar.gz |
SERVER-14626 Implement quorum check functionality for use in replcia set initiate and reconfig.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/SConscript | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/check_quorum_for_config_change.cpp | 448 | ||||
-rw-r--r-- | src/mongo/db/repl/check_quorum_for_config_change.h | 77 | ||||
-rw-r--r-- | src/mongo/db/repl/check_quorum_for_config_change_test.cpp | 545 | ||||
-rw-r--r-- | src/mongo/db/repl/replica_set_config.h | 9 |
5 files changed, 1088 insertions, 0 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index f41e05b9b61..f71ad884d35 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -35,6 +35,7 @@ env.Library('topology_coordinator_impl', env.Library('repl_coordinator_impl', [ + 'check_quorum_for_config_change.cpp', 'repl_coordinator_impl.cpp', 'replica_set_config_checks.cpp', ], @@ -60,6 +61,14 @@ env.CppUnitTest('replica_set_config_checks_test', 'replmocks' ]) +env.CppUnitTest('check_quorum_for_config_change_test', + 'check_quorum_for_config_change_test.cpp', + LIBDEPS=[ + 'repl_coordinator_impl', + 'replication_executor', + 'replmocks', + ]) + env.Library('repl_coordinator_interface', ['repl_coordinator.cpp', 'repl_coordinator_external_state.cpp',], diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp new file mode 100644 index 00000000000..fa52cf2d094 --- /dev/null +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -0,0 +1,448 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/check_quorum_for_config_change.h" + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { +namespace { + /** + * Quorum checking state machine. + * + * Usage: Construct a QuorumChecker, pass in a pointer to the configuration for which you're + * checking quorum, and the integer index of the member config representing the "executing" + * node. Call "run", pass in an executor, and the returned status is the result of the quorum + * check. + * + * Theory of operation: + * + * The QuorumChecker executes three kinds of callback in the executor's run loop. + * + * - _startQuorumCheck schedules heartbeats to all nodes except the local one, and initializes + * the checker's response tabulation with information about the local node. + * + * - _onQuorumCheckHeartbeatResponse updates the response tabulation based on a response + * or timeout from a remote node. + * + * - _onQuorumCheckComplete uses information in the tabulation to compute the final status + * of the quorum check, sometime after _haveReceivedSufficientReplies becomes true. + * + * The thread executing QuorumChecker::run first performs some initial set up, required for it + * to synchronize effectively with the thread running executor's event loop. It creates the + * event that is used to trigger _onQuorumCheckComplete, and schedules that callback and the + * _startQuorumCheck callbacks. It then waits for _onQuorumCheckComplete to return, at which + * point it knows that _finalStatus is set to the proper response, and so can return. + * + * Some scheduled _onQuorumCheckHeartbeatResponse callbacks may not have executed before + * QuorumChecker::run wakes up from waiting for the _onQuorumCheckComplete callback. Before + * returning, QuorumChecker::run marks all callbacks it scheduled for cancelation, ensuring that + * even if they run they will not access member variables of QuorumChecker. This makes it safe + * to let QuorumChecker go out of scope as soon as QuorumChecker::run returns. + */ + class QuorumChecker { + MONGO_DISALLOW_COPYING(QuorumChecker); + public: + /** + * Constructs a QuorumChecker that is used to confirm that sufficient nodes are up to accept + * "rsConfig". "myIndex" is the index of the local node, which is assumed to be up. + * + * "rsConfig" must stay in scope until QuorumChecker's destructor completes. + */ + QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex); + + /** + * Executes the quorum check using "executor" for network and event loop operations. + * + * Returns Status::OK() if a quorum responded, or an error status otherwise. + */ + Status run(ReplicationExecutor* executor); + + private: + /** + * Initial callback run in the event loop, that schedules remote heartbeats and initializes + * QuorumChecker state based on the member config of the current node. + */ + void _startQuorumCheck(const ReplicationExecutor::CallbackData& cbData); + + /** + * Callback that processes a single heartbeat response. + */ + void _onQuorumCheckHeartbeatResponse( + const ReplicationExecutor::RemoteCommandCallbackData& cbData, + const int memberIndex); + + /** + * Callback that executes after _haveReceivedSufficientReplies() becomes true. + * + * Computes the quorum result based on responses received so far, stores it into + * _finalStatus, and enables QuorumChecker::run() to return. + */ + void _onQuorumCheckComplete(const ReplicationExecutor::CallbackData& cbData); + + /** + * Updates the QuorumChecker state based on the data from a single heartbeat response. + * + * Updates state labeled (X) in the concurrency legend, below, and so only safe + * to call from within executor thread callbacks. + */ + void _tabulateHeartbeatResponse( + const ReplicationExecutor::RemoteCommandCallbackData& cbData, + const int memberIndex); + + /** + * Returns true if we've received enough responses to conclude the quorum check. + * + * Reads state labeled (X) in the concurrency legend, below, and so only safe + * to call from within executor thread callbacks. + */ + bool _haveReceivedSufficientReplies() const; + + /** + * Signals the event that indicates that _haveReceivedSufficientReplies() is now true, + * unless that event has already been signaled. + * + * Called from executor callbacks or the thread executing QuorumChecker::run. + */ + void _signalSufficientResponsesReceived(ReplicationExecutor* executor); + + // Concurrency legend: + // (R) Read-only during concurrent operation (after run() calls a schedule method). + // (X) Only read and modified by executor callbacks. + // (C) Written by the "startup" or "completion" callback, only, and read by client threads + // that have waited for the "completion" callback to complete + + // Pointer to the replica set configuration for which we're checking quorum. + const ReplicaSetConfig* const _rsConfig; // (R) + + // Index of the local node's member configuration in _rsConfig. + const int _myIndex; // (R) + + // Event signaled when _haveReceivedSufficientReplies() becomes true. + ReplicationExecutor::EventHandle _sufficientResponsesReceived; // (R) + + // List of nodes believed to be down. + std::vector<HostAndPort> _down; // (X) + + // List of voting nodes that have responded affirmatively. + std::vector<HostAndPort> _voters; // (X) + + // Total number of responses and timeouts processed. + int _numResponses; // (X) + + // Number of electable nodes that have responded affirmatively. + int _numElectable; // (X) + + // Set to a non-OK status if a response from a remote node indicates + // that the quorum check should definitely fail, such as because of + // a replica set name mismatch. + Status _vetoStatus; // (X) + + // List of heartbeat callbacks scheduled by _startQuorumCheck and + // canceled by _onQuorumCheckComplete. + std::vector<ReplicationExecutor::CallbackHandle> _hbResponseCallbacks; // (X) + + // Final status of the quorum check, returned by run(). + Status _finalStatus; // (C) + }; + + QuorumChecker::QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex) + : _rsConfig(rsConfig), + _myIndex(myIndex), + _numResponses(0), + _numElectable(0), + _vetoStatus(Status::OK()), + _finalStatus(ErrorCodes::CallbackCanceled, "Quorum check canceled") { + + invariant(myIndex < _rsConfig->getNumMembers()); + } + + Status QuorumChecker::run(ReplicationExecutor* executor) { + StatusWith<ReplicationExecutor::EventHandle> evh = executor->makeEvent(); + if (!evh.isOK()) { + return evh.getStatus(); + } + _sufficientResponsesReceived = evh.getValue(); + + StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = executor->onEvent( + _sufficientResponsesReceived, + stdx::bind(&QuorumChecker::_onQuorumCheckComplete, this, stdx::placeholders::_1)); + if (!finishCheckCallback.isOK()) { + _signalSufficientResponsesReceived(executor); // Clean up. + return finishCheckCallback.getStatus(); + } + + StatusWith<ReplicationExecutor::CallbackHandle> startCheckCallback = executor->scheduleWork( + stdx::bind(&QuorumChecker::_startQuorumCheck, this, stdx::placeholders::_1)); + if (!startCheckCallback.isOK()) { + _signalSufficientResponsesReceived(executor); // Clean up. + executor->cancel(finishCheckCallback.getValue()); // Clean up. + return startCheckCallback.getStatus(); + } + + executor->wait(finishCheckCallback.getValue()); + + return _finalStatus; + } + + void QuorumChecker::_startQuorumCheck(const ReplicationExecutor::CallbackData& cbData) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + const bool isInitialConfig = _rsConfig->getConfigVersion() == 1; + const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex); + + if (myConfig.isVoter()) { + _voters.push_back(myConfig.getHostAndPort()); + } + if (myConfig.isElectable()) { + _numElectable = 1; + } + _numResponses = 1; // We "responded" to ourself already. + + if (_haveReceivedSufficientReplies()) { + _signalSufficientResponsesReceived(cbData.executor); + } + + // TODO: Call a helper to make the request object. + const BSONObj hbRequest = BSON( + "replSetHeartbeat" << _rsConfig->getReplSetName() << + "v" << _rsConfig->getConfigVersion() << + "pv" << 1 << + "checkEmpty" << isInitialConfig << + "from" << myConfig.getHostAndPort().toString() << + "fromId" << myConfig.getId()); + + // Send a bunch of heartbeat requests. + // Schedule an operation when a "sufficient" number of them have completed, and use that + // to compute the quorum check results. + // Wait for the "completion" callback to finish, and then it's OK to return the results. + for (int i = 0; i < _rsConfig->getNumMembers(); ++i) { + if (_myIndex == i) { + // No need to check self for liveness or unreadiness. + continue; + } + const StatusWith<ReplicationExecutor::CallbackHandle> cbh = + cbData.executor->scheduleRemoteCommand( + ReplicationExecutor::RemoteCommandRequest( + _rsConfig->getMemberAt(i).getHostAndPort(), + "admin", + hbRequest, + _rsConfig->getHeartbeatTimeoutPeriodMillis()), + stdx::bind(&QuorumChecker::_onQuorumCheckHeartbeatResponse, + this, + stdx::placeholders::_1, + i)); + if (!cbh.isOK()) { + _vetoStatus = cbh.getStatus(); + _signalSufficientResponsesReceived(cbData.executor); + return; + } + _hbResponseCallbacks.push_back(cbh.getValue()); + } + } + + void QuorumChecker::_onQuorumCheckHeartbeatResponse( + const ReplicationExecutor::RemoteCommandCallbackData& cbData, + const int memberIndex) { + + if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { + // If this callback has been canceled, it's not safe to look at *this. However, + // QuorumChecker::run has already returned or will without the information we + // can provide here. + return; + } + + _tabulateHeartbeatResponse(cbData, memberIndex); + + if (_haveReceivedSufficientReplies()) { + _signalSufficientResponsesReceived(cbData.executor); + } + } + + void QuorumChecker::_onQuorumCheckComplete(const ReplicationExecutor::CallbackData& cbData) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + // If this callback has been canceled, it's not safe to look at *this. However, + // QuorumChecker::run has already returned or will without the information we + // can provide here. + return; + } + + // Cancel all the heartbeat callbacks, so that they do not attempt to access QuorumChecker + // state after this callback completes. + std::for_each(_hbResponseCallbacks.begin(), + _hbResponseCallbacks.end(), + stdx::bind(&ReplicationExecutor::cancel, + cbData.executor, + stdx::placeholders::_1)); + + if (!_vetoStatus.isOK()) { + _finalStatus = _vetoStatus; + return; + } + if (_rsConfig->getConfigVersion() == 1 && !_down.empty()) { + str::stream message; + message << "Could not contact the following nodes during replica set initiation: " << + _down.front().toString(); + for (size_t i = 1; i < _down.size(); ++i) { + message << ", " << _down[i].toString(); + } + _finalStatus = Status(ErrorCodes::NodeNotFound, message); + return; + } + if (_numElectable == 0) { + _finalStatus = Status( + ErrorCodes::NodeNotFound, "Quorum check failed because no " + "electable nodes responded; at least one required for config"); + return; + } + if (int(_voters.size()) < _rsConfig->getMajorityVoteCount()) { + str::stream message; + message << "Quorum check failed because not enough voting nodes responded; required " << + _rsConfig->getMajorityVoteCount() << " but "; + + if (_voters.size() == 0) { + message << "none responded"; + } + else { + message << "only the following " << _voters.size() << + " voting nodes responded: " << _voters.front().toString(); + for (size_t i = 1; i < _voters.size(); ++i) { + message << ", " << _voters[i].toString(); + } + } + _finalStatus = Status(ErrorCodes::NodeNotFound, message); + return; + } + _finalStatus = Status::OK(); + } + + void QuorumChecker::_tabulateHeartbeatResponse( + const ReplicationExecutor::RemoteCommandCallbackData& cbData, + const int memberIndex) { + + ++_numResponses; + if (!cbData.response.isOK()) { + warning() << "Failed to complete heartbeat request to " << cbData.request.target << + "; " << cbData.response.getStatus(); + _down.push_back(cbData.request.target); + return; + } + BSONObj res = cbData.response.getValue(); + if (res["mismatch"].trueValue()) { + std::string message = str::stream() << "Our set name did not match that of " << + cbData.request.target.toString(); + _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); + warning() << message; + return; + } + if (res.getStringField("set")[0] != '\0') { + if (res["v"].numberInt() >= _rsConfig->getConfigVersion()) { + std::string message = str::stream() << "Our config version of " << + _rsConfig->getConfigVersion() << + " is no larger than the version on " << cbData.request.target.toString() << + ", which is " << res["v"].toString(); + _vetoStatus = Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, message); + warning() << message; + return; + } + } + if (!res["ok"].trueValue()) { + warning() << "Got error response on heartbeat request to " << cbData.request.target << + "; " << res; + _down.push_back(_rsConfig->getMemberAt(memberIndex).getHostAndPort()); + return; + } + + const MemberConfig& memberConfig = _rsConfig->getMemberAt(memberIndex); + if (memberConfig.isElectable()) { + ++_numElectable; + } + if (memberConfig.isVoter()) { + _voters.push_back(cbData.request.target); + } + } + + bool QuorumChecker::_haveReceivedSufficientReplies() const { + if (!_vetoStatus.isOK() || _numResponses == _rsConfig->getNumMembers()) { + // Vetoed or everybody has responded. All done. + return true; + } + if (_rsConfig->getConfigVersion() == 1) { + // Have not received responses from every member, and the proposed config + // version is 1 (initial configuration). Keep waiting. + return false; + } + if (_numElectable == 0) { + // Have not heard from at least one electable node. Keep waiting. + return false; + } + if (int(_voters.size()) < _rsConfig->getMajorityVoteCount()) { + // Have not heard from a majority of voters. Keep waiting. + return false; + } + + // Have heard from a majority of voters and one electable node. All done. + return true; + } + + void QuorumChecker::_signalSufficientResponsesReceived(ReplicationExecutor* executor) { + if (_sufficientResponsesReceived.isValid()) { + executor->signalEvent(_sufficientResponsesReceived); + _sufficientResponsesReceived = ReplicationExecutor::EventHandle(); + } + } + +} // namespace + + Status checkQuorumForInitiate(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + const int myIndex) { + invariant(rsConfig.getConfigVersion() == 1); + QuorumChecker checker(&rsConfig, myIndex); + return checker.run(executor); + } + + Status checkQuorumForReconfig(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + const int myIndex) { + invariant(rsConfig.getConfigVersion() > 1); + QuorumChecker checker(&rsConfig, myIndex); + return checker.run(executor); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/check_quorum_for_config_change.h b/src/mongo/db/repl/check_quorum_for_config_change.h new file mode 100644 index 00000000000..be824d3bf98 --- /dev/null +++ b/src/mongo/db/repl/check_quorum_for_config_change.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + + class Status; + +namespace repl { + + class ReplicaSetConfig; + class ReplicationExecutor; + + /** + * Performs a quorum call to determine if a sufficient number of nodes are up + * to initiate a replica set with configuration "rsConfig". + * + * "myIndex" is the index of this node's member configuration in "rsConfig". + * "executor" is the event loop in which to schedule network/aysnchronous processing. + * + * For purposes of initiate, a quorum is only met if all of the following conditions + * are met: + * - All nodes respond. + * - No nodes other than the node running the quorum check have data. + * - No nodes are already joined to a replica set. + * - No node reports a replica set name other than the one in "rsConfig". + */ + Status checkQuorumForInitiate(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + const int myIndex); + + /** + * Performs a quorum call to determine if a sufficient number of nodes are up + * to replace the current replica set configuration with "rsConfig". + * + * "myIndex" is the index of this node's member configuration in "rsConfig". + * "executor" is the event loop in which to schedule network/aysnchronous processing. + * + * For purposes of reconfig, a quorum is only met if all of the following conditions + * are met: + * - A majority of voting nodes respond. + * - At least one electable node responds. + * - No responding node reports a replica set name other than the one in "rsConfig". + * - All responding nodes report a config version less than the one in "rsConfig". + */ + Status checkQuorumForReconfig(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + const int myIndex); + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp new file mode 100644 index 00000000000..4a1396fb780 --- /dev/null +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -0,0 +1,545 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/thread.hpp> +#include <boost/scoped_ptr.hpp> + +#include "mongo/base/status.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/check_quorum_for_config_change.h" +#include "mongo/db/repl/network_interface_mock.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/stdx/functional.h" +#include "mongo/unittest/unittest.h" + +#define ASSERT_REASON_CONTAINS(STATUS, PATTERN) do { \ + const mongo::Status s_ = (STATUS); \ + ASSERT_FALSE(s_.reason().find(PATTERN) == std::string::npos) << \ + #STATUS ".reason() == " << s_.reason(); \ + } while (false) + +#define ASSERT_NOT_REASON_CONTAINS(STATUS, PATTERN) do { \ + const mongo::Status s_ = (STATUS); \ + ASSERT_TRUE(s_.reason().find(PATTERN) == std::string::npos) << \ + #STATUS ".reason() == " << s_.reason(); \ + } while (false) + +namespace mongo { +namespace repl { +namespace { + + typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; + + class CheckQuorumTest : public mongo::unittest::Test { + protected: + NetworkInterfaceMock* _net; + boost::scoped_ptr<ReplicationExecutor> _executor; + boost::scoped_ptr<boost::thread> _executorThread; + + private: + void setUp(); + void tearDown(); + }; + + class CheckQuorumForInitiate : public CheckQuorumTest {}; + class CheckQuorumForReconfig : public CheckQuorumTest {}; + + void CheckQuorumTest::setUp() { + _net = new NetworkInterfaceMock; + _executor.reset(new ReplicationExecutor(_net)); + _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, + _executor.get()))); + } + + void CheckQuorumTest::tearDown() { + _net->unblockAll(); + _executor->shutdown(); + _executorThread->join(); + } + + ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) { + ReplicaSetConfig config; + ASSERT_OK(config.initialize(configBson)); + ASSERT_OK(config.validate()); + return config; + } + + TEST_F(CheckQuorumForInitiate, ValidSingleNodeSet) { + ReplicaSetConfig config = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1")))); + ASSERT_OK(checkQuorumForInitiate(_executor.get(), config, 0)); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckCanceledByShutdown) { + _executor->shutdown(); + ReplicaSetConfig config = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1")))); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, + checkQuorumForInitiate(_executor.get(), config, 0)); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSeveralDownNodes) { + // In this test, "we" are host "h3:1". All other nodes time out on + // their heartbeat request, and so the quorum check for initiate + // will fail because some members were unavailable. + ReplicaSetConfig config = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1") << + BSON("_id" << 5 << "host" << "h5:1")))); + Status status = checkQuorumForInitiate(_executor.get(), config, 2); + ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); + ASSERT_REASON_CONTAINS( + status, "Could not contact the following nodes during replica set initiation"); + ASSERT_REASON_CONTAINS(status, "h1:1"); + ASSERT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + ASSERT_REASON_CONTAINS(status, "h4:1"); + ASSERT_REASON_CONTAINS(status, "h5:1"); + } + + const BSONObj makeHeartbeatRequest(const ReplicaSetConfig& rsConfig, int myConfigIndex) { + const MemberConfig& myConfig = rsConfig.getMemberAt(myConfigIndex); + return BSON( + "replSetHeartbeat" << rsConfig.getReplSetName() << + "v" << rsConfig.getConfigVersion() << + "pv" << 1 << + "checkEmpty" << (rsConfig.getConfigVersion() == 1) << + "from" << myConfig.getHostAndPort().toString() << + "fromId" << myConfig.getId()); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckSuccessForFiveNodes) { + // In this test, "we" are host "h3:1". All nodes respond successfully to their heartbeat + // requests, and the quorum check succeeds. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1") << + BSON("_id" << 5 << "host" << "h5:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h2", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h4", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + + ASSERT_OK(checkQuorumForInitiate(_executor.get(), rsConfig, myConfigIndex)); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToOneDownNode) { + // In this test, "we" are host "h3:1". All nodes except "h2:1" respond + // successfully to their heartbeat requests, but quorum check fails because + // all nodes must be available for initiate. This is so even though "h2" + // is neither voting nor electable. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1" << + "priority" << 0 << "votes" << 0) << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1") << + BSON("_id" << 5 << "host" << "h5:1") << + BSON("_id" << 6 << "host" << "h6:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h4", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h6", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + + Status status = checkQuorumForInitiate(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); + ASSERT_REASON_CONTAINS( + status, "Could not contact the following nodes during replica set initiation"); + ASSERT_NOT_REASON_CONTAINS(status, "h1:1"); + ASSERT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h4:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h5:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h6:1"); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetNameMismatch) { + // In this test, "we" are host "h3:1". All nodes respond + // successfully to their heartbeat requests, but quorum check fails because + // "h4" declares that the requested replica set name was not what it expected. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1") << + BSON("_id" << 5 << "host" << "h5:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h2", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h4", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "mismatch" << true))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + + Status status = checkQuorumForInitiate(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); + ASSERT_REASON_CONTAINS( + status, "Our set name did not match"); + ASSERT_NOT_REASON_CONTAINS(status, "h1:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + ASSERT_REASON_CONTAINS(status, "h4:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h5:1"); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToInitializedNode) { + // In this test, "we" are host "h3:1". All nodes respond + // successfully to their heartbeat requests, but quorum check fails because + // "h5" declares that it is already initialized. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1") << + BSON("_id" << 5 << "host" << "h5:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h2", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h4", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "set" << "rs0" << "v" << 1))); + + Status status = checkQuorumForInitiate(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); + ASSERT_REASON_CONTAINS( + status, "Our config version of"); + ASSERT_REASON_CONTAINS( + status, "is no larger than the version"); + ASSERT_NOT_REASON_CONTAINS(status, "h1:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h4:1"); + ASSERT_REASON_CONTAINS(status, "h5:1"); + } + + TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToInitializedNodeOnlyOneRespondent) { + // In this test, "we" are host "h3:1". Only node "h5" responds before the test completes, + // and quorum check fails because "h5" declares that it is already initialized. + // + // Compare to QuorumCheckFailedDueToInitializedNode, above. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1") << + BSON("_id" << 5 << "host" << "h5:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + // Responses from nodes h1, h2 and h4 and blocked until after the test + // completes. + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1)), + true); + _net->addResponse(RemoteCommandRequest(HostAndPort("h2", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1)), + true); + _net->addResponse(RemoteCommandRequest(HostAndPort("h4", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1)), + true); + + // h5 responds, with a version incompatibility. + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "set" << "rs0" << "v" << 1))); + + Status status = checkQuorumForInitiate(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); + ASSERT_REASON_CONTAINS( + status, "Our config version of"); + ASSERT_REASON_CONTAINS( + status, "is no larger than the version"); + ASSERT_NOT_REASON_CONTAINS(status, "h1:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h4:1"); + ASSERT_REASON_CONTAINS(status, "h5:1"); + } + + TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToHigherConfigVersion) { + // In this test, "we" are host "h3:1". The request to "h2" times out, + // and the request to "h1" comes back indicating a higher config version. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 2 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "set" << "rs0" << "v" << 5))); + + Status status = checkQuorumForReconfig(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); + ASSERT_REASON_CONTAINS( + status, "Our config version of"); + ASSERT_REASON_CONTAINS( + status, "is no larger than the version"); + ASSERT_REASON_CONTAINS(status, "h1:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + } + + TEST_F(CheckQuorumForReconfig, QuorumCheckVetoedDueToIncompatibleSetName) { + // In this test, "we" are host "h3:1". The request to "h1" times out, + // and the request to "h2" comes back indicating an incompatible set name. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 2 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1")))); + const int myConfigIndex = 2; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h2", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "mismatch" << true))); + + Status status = checkQuorumForReconfig(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NewReplicaSetConfigurationIncompatible, status); + ASSERT_REASON_CONTAINS(status, "Our set name did not match"); + ASSERT_NOT_REASON_CONTAINS(status, "h1:1"); + ASSERT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + + } + + TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientVoters) { + // In this test, "we" are host "h4". Only "h1", "h2" and "h3" are voters, + // and of the voters, only "h1" responds. As a result, quorum check fails. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 2 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1" << "votes" << 0) << + BSON("_id" << 5 << "host" << "h5:1" << "votes" << 0)))); + const int myConfigIndex = 3; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + Status status = checkQuorumForReconfig(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); + ASSERT_REASON_CONTAINS(status, "not enough voting nodes responded; required 2 but only"); + ASSERT_REASON_CONTAINS(status, "h1:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h2:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h3:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h4:1"); + ASSERT_NOT_REASON_CONTAINS(status, "h5:1"); + } + + TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) { + // In this test, "we" are host "h4". Only "h1", "h2" and "h3" are electable, + // and none of them respond. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 2 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1" << "priority" << 0) << + BSON("_id" << 5 << "host" << "h5:1" << "priority" << 0)))); + const int myConfigIndex = 3; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + Status status = checkQuorumForReconfig(_executor.get(), rsConfig, myConfigIndex); + ASSERT_EQUALS(ErrorCodes::NodeNotFound, status); + ASSERT_REASON_CONTAINS(status, "no electable nodes responded"); + } + + // TODO: Succeed with minimal quorum. + TEST_F(CheckQuorumForReconfig, QuorumCheckSucceedsWithAsSoonAsPossible) { + // In this test, "we" are host "h4". Only "h1", "h2" and "h3" can vote. + // This test should succeed as soon as h1 and h2 respond, so we block + // h3 and h5 from responding or timing out until the test completes. + + const ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "rs0" << + "version" << 2 << + "members" << BSON_ARRAY( + BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1") << + BSON("_id" << 4 << "host" << "h4:1" << "votes" << 0) << + BSON("_id" << 5 << "host" << "h5:1" << "votes" << 0)))); + const int myConfigIndex = 3; + const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex); + + _net->addResponse(RemoteCommandRequest(HostAndPort("h1", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + _net->addResponse(RemoteCommandRequest(HostAndPort("h2", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 1))); + + // If this message arrived, the reconfig would be vetoed, but it is delayed + // until the quorum check completes, and so has no effect. + _net->addResponse(RemoteCommandRequest(HostAndPort("h3", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "set" << "rs0" << "v" << 3)), + true); + + // Ditto RE veto and no effect. + _net->addResponse(RemoteCommandRequest(HostAndPort("h5", 1), + "admin", + hbRequest), + StatusWith<BSONObj>(BSON("ok" << 0 << "set" << "rs0" << "v" << 3)), + true); + ASSERT_OK(checkQuorumForReconfig(_executor.get(), rsConfig, myConfigIndex)); + } + +} // namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replica_set_config.h b/src/mongo/db/repl/replica_set_config.h index bf879dd36aa..73571d780e1 100644 --- a/src/mongo/db/repl/replica_set_config.h +++ b/src/mongo/db/repl/replica_set_config.h @@ -130,6 +130,15 @@ namespace repl { Seconds getHeartbeatTimeoutPeriod() const { return _heartbeatTimeoutPeriod; } /** + * Gets the amount of time to wait for a response to hearbeats sent to other + * nodes in the replica set, as above, but returns a Milliseconds instead of + * Seconds object. + */ + Milliseconds getHeartbeatTimeoutPeriodMillis() const { + return Milliseconds(_heartbeatTimeoutPeriod.total_milliseconds()); + } + + /** * Gets the number of nodes that constitutes a "majority" in this replica set, * for purposes of replicating data. */ |