diff options
Diffstat (limited to 'src')
24 files changed, 1199 insertions, 50 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 1e0a12989f4..a8e96c93f40 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -113,6 +113,7 @@ env.Library('repl_coordinator_impl', 'replication_coordinator_impl.cpp', 'replication_coordinator_impl_elect.cpp', 'replication_coordinator_impl_heartbeat.cpp', + 'vote_requester.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/common', @@ -186,6 +187,12 @@ env.CppUnitTest('freshness_checker_test', 'replica_set_messages', 'replmocks']) +env.CppUnitTest('vote_requester_test', + 'vote_requester_test.cpp', + LIBDEPS=['repl_coordinator_impl', + 'replica_set_messages', + 'replmocks']) + env.CppUnitTest('election_winner_declarer_test', 'election_winner_declarer_test.cpp', LIBDEPS=['repl_coordinator_impl', @@ -247,9 +254,11 @@ env.Library('replica_set_messages', 'repl_set_heartbeat_response.cpp', 'repl_set_heartbeat_response_v1.cpp', 'repl_set_html_summary.cpp', + 'repl_set_request_votes_args.cpp', 'replica_set_config.cpp', 'replica_set_tag.cpp', 'update_position_args.cpp', + 'last_vote.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/bson', @@ -283,7 +292,6 @@ env.Library( 'replset_commands.cpp', 'repl_set_command.cpp', 'repl_set_request_votes.cpp', - 'repl_set_request_votes_args.cpp', 'repl_set_declare_election_winner.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/repl/last_vote.cpp b/src/mongo/db/repl/last_vote.cpp new file mode 100644 index 00000000000..e7a1b178bf4 --- /dev/null +++ b/src/mongo/db/repl/last_vote.cpp @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/last_vote.h" + +#include "mongo/bson/util/bson_check.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/jsobj.h" + +namespace mongo { +namespace repl { +namespace { + + const std::string kCandidateIdFieldName = "candidateId"; + const std::string kTermFieldName = "term"; + + const std::string kLegalFieldNames[] = { + kCandidateIdFieldName, + kTermFieldName, + }; + +} // namespace + + Status LastVote::initialize(const BSONObj& argsObj) { + Status status = bsonCheckOnlyHasFields("VotedFar", + argsObj, + kLegalFieldNames); + if (!status.isOK()) + return status; + + status = bsonExtractIntegerField(argsObj, kTermFieldName, &_term); + if (!status.isOK()) + return status; + + status = bsonExtractIntegerField(argsObj, kCandidateIdFieldName, &_candidateId); + if (!status.isOK()) + return status; + + return Status::OK(); + } + + void LastVote::setTerm(long long term) { + _term = term; + } + + void LastVote::setCandidateId(long long candidateId) { + _candidateId = candidateId; + } + + long long LastVote::getTerm() const { + return _term; + } + + long long LastVote::getCandidateId() const { + return _candidateId; + } + + + BSONObj LastVote::toBSON() const { + BSONObjBuilder builder; + builder.append(kTermFieldName, _term); + builder.append(kCandidateIdFieldName, _candidateId); + return builder.obj(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/last_vote.h b/src/mongo/db/repl/last_vote.h new file mode 100644 index 00000000000..b2314823f00 --- /dev/null +++ b/src/mongo/db/repl/last_vote.h @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + + class BSONObj; + class BSONObjBuilder; + class Status; + +namespace repl { + + class LastVote { + public: + Status initialize(const BSONObj& argsObj); + + long long getTerm() const; + long long getCandidateId() const; + + void setTerm(long long term); + void setCandidateId(long long candidateId); + BSONObj toBSON() const; + + private: + long long _candidateId = -1; + long long _term = -1; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 3cada1b8d68..3d823de4930 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -59,7 +59,8 @@ namespace repl { } ReplSetRequestVotesResponse response; - status = getGlobalReplicationCoordinator()->processReplSetRequestVotes(parsedArgs, + status = getGlobalReplicationCoordinator()->processReplSetRequestVotes(txn, + parsedArgs, &response); response.addToBSON(&result); return appendCommandStatus(result, status); diff --git a/src/mongo/db/repl/repl_set_request_votes_args.cpp b/src/mongo/db/repl/repl_set_request_votes_args.cpp index f9efa4d41d5..da8123ab9f6 100644 --- a/src/mongo/db/repl/repl_set_request_votes_args.cpp +++ b/src/mongo/db/repl/repl_set_request_votes_args.cpp @@ -38,7 +38,7 @@ namespace { const std::string kCandidateIdFieldName = "candidateId"; const std::string kCommandName = "replSetRequestVotes"; - const std::string kConfigVersionFieldName = "cfgver"; + const std::string kConfigVersionFieldName = "configVersion"; const std::string kLastCommittedOpFieldName = "lastCommittedOp"; const std::string kOkFieldName = "ok"; const std::string kOpTimeFieldName = "ts"; @@ -187,5 +187,11 @@ namespace { builder->append(kReasonFieldName, _reason); } + BSONObj ReplSetRequestVotesResponse::toBSON() const { + BSONObjBuilder builder; + addToBSON(&builder); + return builder.obj(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_request_votes_args.h b/src/mongo/db/repl/repl_set_request_votes_args.h index 216ef159e8a..f7152fe838e 100644 --- a/src/mongo/db/repl/repl_set_request_votes_args.h +++ b/src/mongo/db/repl/repl_set_request_votes_args.h @@ -62,12 +62,18 @@ namespace repl { public: Status initialize(const BSONObj& argsObj); + void setOk(bool ok) { _ok = ok; } + void setVoteGranted(bool voteGranted) { _voteGranted = voteGranted; } + void setTerm(long long term) { _term = term; } + void setReason(const std::string& reason) { _reason = reason; } + bool getOk() const; long long getTerm() const; bool getVoteGranted() const; const std::string& getReason() const; void addToBSON(BSONObjBuilder* builder) const; + BSONObj toBSON() const; private: bool _ok = false; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 88304cdef0a..660c7b47407 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -54,6 +54,7 @@ namespace repl { class HandshakeArgs; class IsMasterResponse; class OplogReader; + class OpTime; class ReplSetDeclareElectionWinnerArgs; class ReplSetDeclareElectionWinnerResponse; class ReplSetHeartbeatArgs; @@ -277,6 +278,11 @@ namespace repl { virtual void setMyHeartbeatMessage(const std::string& msg) = 0; /** + * Returns the last optime recorded by setMyLastOptimeV1. + */ + virtual OpTime getMyLastOptimeV1() const = 0; + + /** * Returns the last optime recorded by setMyLastOptime. */ virtual Timestamp getMyLastOptime() const = 0; @@ -570,7 +576,8 @@ namespace repl { * Handles an incoming replSetRequestVotes command. * Adds BSON to 'resultObj'; returns a Status with either OK or an error message. */ - virtual Status processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + virtual Status processReplSetRequestVotes(OperationContext* txn, + const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) = 0; /* diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 07fa9ed921a..bc7091a8179 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -45,6 +45,8 @@ namespace mongo { namespace repl { + class LastVote; + /** * This class represents the interface the ReplicationCoordinator uses to interact with the * rest of the system. All functionality of the ReplicationCoordinatorImpl that would introduce @@ -112,6 +114,17 @@ namespace repl { virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config) = 0; /** + * Gets the replica set lastVote document from local storage, or returns an error. + */ + virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* txn) = 0; + + /** + * Stores the replica set lastVote document in local storage, or returns an error. + */ + virtual Status storeLocalLastVoteDocument(OperationContext* txn, + const LastVote& lastVote) = 0; + + /** * Sets the global opTime to be 'newTime'. */ virtual void setGlobalTimestamp(const Timestamp& newTime) = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 5e3f498fdeb..fabbbf8b036 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -54,6 +54,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/last_vote.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" @@ -69,6 +70,8 @@ namespace repl { namespace { const char configCollectionName[] = "local.system.replset"; const char configDatabaseName[] = "local"; + const char lastVoteCollectionName[] = "local.replset.election"; + const char lastVoteDatabaseName[] = "local"; const char meCollectionName[] = "local.me"; const char meDatabaseName[] = "local"; const char tsFieldName[] = "ts"; @@ -199,6 +202,49 @@ namespace { } + StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument( + OperationContext* txn) { + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + BSONObj lastVoteObj; + if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) { + return StatusWith<LastVote>( + ErrorCodes::NoMatchingDocument, + str::stream() << "Did not find replica set lastVote document in " + << lastVoteCollectionName); + } + LastVote lastVote; + lastVote.initialize(lastVoteObj); + return StatusWith<LastVote>(lastVote); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, + "load replica set lastVote", + lastVoteCollectionName); + } + catch (const DBException& ex) { + return StatusWith<LastVote>(ex.toStatus()); + } + } + + Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( + OperationContext* txn, + const LastVote& lastVote) { + BSONObj lastVoteObj = lastVote.toBSON(); + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X); + Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj); + return Status::OK(); + } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, + "save replica set lastVote", + lastVoteCollectionName); + } + catch (const DBException& ex) { + return ex.toStatus(); + } + + } + void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { setNewOptime(newTime); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 482b143f629..2c13ac634e5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -54,6 +54,8 @@ namespace repl { virtual bool isSelf(const HostAndPort& host); virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config); + virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* txn); + virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote); virtual void setGlobalTimestamp(const Timestamp& newTime); virtual StatusWith<Timestamp> loadLastOpTime(OperationContext* txn); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index bd4e194c5fc..3060e55f1a2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -44,11 +44,14 @@ namespace repl { ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() : _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"), + _localRsLastVoteDocument(ErrorCodes::NoMatchingDocument, "No local lastVote document"), _lastOpTime(ErrorCodes::NoMatchingDocument, "No last oplog entry"), - _canAcquireGlobalSharedLock(true), - _storeLocalConfigDocumentStatus(Status::OK()), - _storeLocalConfigDocumentShouldHang(false), - _connectionsClosed(false) { + _canAcquireGlobalSharedLock(true), + _storeLocalConfigDocumentStatus(Status::OK()), + _storeLocalLastVoteDocumentStatus(Status::OK()), + _storeLocalConfigDocumentShouldHang(false), + _storeLocalLastVoteDocumentShouldHang(false), + _connectionsClosed(false) { } ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {} @@ -90,9 +93,9 @@ namespace repl { OperationContext* txn, const BSONObj& config) { { - boost::unique_lock<boost::mutex> lock(_shouldHangMutex); + boost::unique_lock<boost::mutex> lock(_shouldHangConfigMutex); while (_storeLocalConfigDocumentShouldHang) { - _shouldHangCondVar.wait(lock); + _shouldHangConfigCondVar.wait(lock); } } if (_storeLocalConfigDocumentStatus.isOK()) { @@ -108,6 +111,33 @@ namespace repl { _localRsConfigDocument = localConfigDocument; } + StatusWith<LastVote> ReplicationCoordinatorExternalStateMock::loadLocalLastVoteDocument( + OperationContext* txn) { + return _localRsLastVoteDocument; + } + + Status ReplicationCoordinatorExternalStateMock::storeLocalLastVoteDocument( + OperationContext* txn, + const LastVote& lastVote) { + { + boost::unique_lock<boost::mutex> lock(_shouldHangLastVoteMutex); + while (_storeLocalLastVoteDocumentShouldHang) { + _shouldHangLastVoteCondVar.wait(lock); + } + } + if (_storeLocalLastVoteDocumentStatus.isOK()) { + setLocalLastVoteDocument(StatusWith<LastVote>(lastVote)); + return Status::OK(); + } + return _storeLocalLastVoteDocumentStatus; + } + + void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( + const StatusWith<LastVote>& localLastVoteDocument) { + + _localRsLastVoteDocument = localLastVoteDocument; + } + void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(const Timestamp& newTime) { } @@ -126,10 +156,23 @@ namespace repl { } void ReplicationCoordinatorExternalStateMock::setStoreLocalConfigDocumentToHang(bool hang) { - boost::unique_lock<boost::mutex> lock(_shouldHangMutex); + boost::unique_lock<boost::mutex> lock(_shouldHangConfigMutex); _storeLocalConfigDocumentShouldHang = hang; if (!hang) { - _shouldHangCondVar.notify_all(); + _shouldHangConfigCondVar.notify_all(); + } + } + + void ReplicationCoordinatorExternalStateMock::setStoreLocalLastVoteDocumentStatus( + Status status) { + _storeLocalLastVoteDocumentStatus = status; + } + + void ReplicationCoordinatorExternalStateMock::setStoreLocalLastVoteDocumentToHang(bool hang) { + boost::unique_lock<boost::mutex> lock(_shouldHangLastVoteMutex); + _storeLocalLastVoteDocumentShouldHang = hang; + if (!hang) { + _shouldHangLastVoteCondVar.notify_all(); } } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 8f4366b7e17..48473155717 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -38,6 +38,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/replication_coordinator_external_state.h" +#include "mongo/db/repl/last_vote.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -60,6 +61,8 @@ namespace repl { virtual HostAndPort getClientHostAndPort(const OperationContext* txn); virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config); + virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* txn); + virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote); virtual void setGlobalTimestamp(const Timestamp& newTime); virtual StatusWith<Timestamp> loadLastOpTime(OperationContext* txn); virtual void closeConnections(); @@ -81,6 +84,11 @@ namespace repl { void setLocalConfigDocument(const StatusWith<BSONObj>& localConfigDocument); /** + * Sets the return value for subsequent calls to loadLocalLastVoteDocument(). + */ + void setLocalLastVoteDocument(const StatusWith<LastVote>& localLastVoteDocument); + + /** * Sets the return value for subsequent calls to getClientHostAndPort(). */ void setClientHostAndPort(const HostAndPort& clientHostAndPort); @@ -102,16 +110,34 @@ namespace repl { */ void setStoreLocalConfigDocumentToHang(bool hang); + /** + * Sets the return value for subsequent calls to storeLocalLastVoteDocument(). + * If "status" is Status::OK(), the subsequent calls will call the underlying funtion. + */ + void setStoreLocalLastVoteDocumentStatus(Status status); + + /** + * Sets whether or not subsequent calls to storeLocalLastVoteDocument() should hang + * indefinitely or not based on the value of "hang". + */ + void setStoreLocalLastVoteDocumentToHang(bool hang); + private: StatusWith<BSONObj> _localRsConfigDocument; + StatusWith<LastVote> _localRsLastVoteDocument; StatusWith<Timestamp> _lastOpTime; std::vector<HostAndPort> _selfHosts; bool _canAcquireGlobalSharedLock; Status _storeLocalConfigDocumentStatus; + Status _storeLocalLastVoteDocumentStatus; // mutex and cond var for controlling stroeLocalConfigDocument()'s hanging - boost::mutex _shouldHangMutex; - boost::condition _shouldHangCondVar; + boost::mutex _shouldHangConfigMutex; + boost::condition _shouldHangConfigCondVar; + // mutex and cond var for controlling stroeLocalLastVoteDocument()'s hanging + boost::mutex _shouldHangLastVoteMutex; + boost::condition _shouldHangLastVoteCondVar; bool _storeLocalConfigDocumentShouldHang; + bool _storeLocalLastVoteDocumentShouldHang; bool _connectionsClosed; HostAndPort _clientHostAndPort; }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index f8a4031317e..bb023078021 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -57,6 +57,7 @@ #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" +#include "mongo/db/repl/last_vote.h" #include "mongo/db/server_options.h" #include "mongo/db/write_concern_options.h" #include "mongo/stdx/functional.h" @@ -68,7 +69,7 @@ namespace mongo { namespace repl { - + namespace { typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; @@ -226,8 +227,24 @@ namespace { return _rsConfig; } + void ReplicationCoordinatorImpl::_updateLastVote(const LastVote& lastVote) { + _topCoord->loadLastVote(lastVote); + } + bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { + StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn); + if (!lastVote.isOK()) { + log() << "Did not find local voted for document at startup; " << lastVote.getStatus(); + } + else { + LastVote vote = lastVote.getValue(); + _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_updateLastVote, + this, + vote)); + } + StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn); if (!cfg.isOK()) { log() << "Did not find local replica set configuration document at startup; " << @@ -736,6 +753,16 @@ namespace { _externalState->forwardSlaveProgress(); // Must do this outside _mutex } + OpTime ReplicationCoordinatorImpl::getMyLastOptimeV1() const { + boost::lock_guard<boost::mutex> lock(_mutex); + return _getMyLastOptimeV1_inlock(); + } + + // TODO(dannenberg): _slaveInfo should store OpTimes and they shouldn't always have term=0 + OpTime ReplicationCoordinatorImpl::_getMyLastOptimeV1_inlock() const { + return OpTime(_slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime, 0); + } + Timestamp ReplicationCoordinatorImpl::getMyLastOptime() const { boost::lock_guard<boost::mutex> lock(_mutex); return _getMyLastOptime_inlock(); @@ -2411,11 +2438,52 @@ namespace { } Status ReplicationCoordinatorImpl::processReplSetRequestVotes( - const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) { + OperationContext* txn, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response) { if (!isV1ElectionProtocol()) { return {ErrorCodes::BadValue, "not using election protocol v1"}; } - return {ErrorCodes::CommandNotFound, "not implemented"}; + Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"}; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish, + this, + stdx::placeholders::_1, + args, + response, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); + } + _replExecutor.wait(cbh.getValue()); + if (response->getVoteGranted()) { + LastVote lastVote; + lastVote.setTerm(args.getTerm()); + lastVote.setCandidateId(args.getCandidateId()); + + Status status = _externalState->storeLocalConfigDocument(txn, lastVote.toBSON()); + if (!status.isOK()) { + error() << "replSetReconfig failed to store config document; " << status; + return status; + } + + } + return result; + } + + void ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish( + const ReplicationExecutor::CallbackData& cbData, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); + return; + } + + boost::unique_lock<boost::mutex> lk(_mutex); + _topCoord->processReplSetRequestVotes(args, response, getMyLastOptimeV1()); + *result = Status::OK(); } Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner( diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index aa2c7b2dd05..c445327010b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -39,6 +39,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/data_replicator.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_external_state.h" @@ -66,6 +67,7 @@ namespace repl { class ReplicaSetConfig; class SyncSourceFeedback; class TopologyCoordinator; + class LastVote; class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface { @@ -154,6 +156,8 @@ namespace repl { virtual Timestamp getMyLastOptime() const; + virtual OpTime getMyLastOptimeV1() const; + virtual OID getElectionId(); virtual OID getMyRID() const; @@ -237,7 +241,8 @@ namespace repl { virtual Timestamp getLastCommittedOpTime() const; - virtual Status processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + virtual Status processReplSetRequestVotes(OperationContext* txn, + const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response); virtual Status processReplSetDeclareElectionWinner( @@ -453,6 +458,15 @@ namespace repl { Status* result); /** + * Bottom half of processReplSetRequestVotes. + */ + void _processReplSetRequestVotes_finish( + const ReplicationExecutor::CallbackData& cbData, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + Status* result); + + /** * Scheduled to cause the ReplicationCoordinator to reconsider any state that might * need to change as a result of time passing - for instance becoming PRIMARY when a single * node replica set member's stepDown period ends. @@ -519,6 +533,7 @@ namespace repl { Timestamp _getMyLastOptime_inlock() const; + OpTime _getMyLastOptimeV1_inlock() const; /** * Bottom half of setFollowerMode. @@ -607,6 +622,15 @@ namespace repl { MemberState _getMemberState_inlock() const; /** + * Callback that gives the TopologyCoordinator an initial LastVote document from + * local storage. + * + * Called only during replication startup. All other updates come from the + * TopologyCoordinator itself. + */ + void _updateLastVote(const LastVote& lastVote); + + /** * Starts loading the replication configuration from local storage, and if it is valid, * schedules a callback (of _finishLoadLocalConfig) to set it as the current replica set * config (sets _rsConfig and _thisMembersConfigIndex). @@ -930,6 +954,7 @@ namespace repl { // Data Replicator used to replicate data DataReplicator _dr; // (S) + }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 6411cb8ea09..ca87ebf9fe3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -100,7 +100,7 @@ namespace { startCapturingLogMessages(); start(); stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Did not find local ")); + ASSERT_EQUALS(2, countLogLinesContaining("Did not find local ")); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index fd36c227898..9d7f28e62af 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -33,6 +33,7 @@ #include "mongo/base/status.h" #include "mongo/db/write_concern_options.h" #include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/optime.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -135,6 +136,10 @@ namespace repl { void ReplicationCoordinatorMock::resetMyLastOptime() {} + OpTime ReplicationCoordinatorMock::getMyLastOptimeV1() const { + return OpTime(); + } + Timestamp ReplicationCoordinatorMock::getMyLastOptime() const { // TODO return Timestamp(); @@ -300,8 +305,9 @@ namespace repl { } Status ReplicationCoordinatorMock::processReplSetRequestVotes( - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response) { + OperationContext* txn, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response) { return Status::OK(); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 6b346ab449e..79cf56507c8 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -99,6 +99,8 @@ namespace repl { virtual void setMyHeartbeatMessage(const std::string& msg); + virtual OpTime getMyLastOptimeV1() const; + virtual Timestamp getMyLastOptime() const; virtual OID getElectionId(); @@ -182,7 +184,8 @@ namespace repl { virtual Timestamp getLastCommittedOpTime() const; - virtual Status processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + virtual Status processReplSetRequestVotes(OperationContext* txn, + const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response); virtual Status processReplSetDeclareElectionWinner( diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 595138a20e7..a9222a84085 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -48,6 +48,7 @@ namespace repl { class ReplSetHeartbeatArgs; class ReplicaSetConfig; class TagSubgroup; + class LastVote; struct MemberState; /** @@ -367,6 +368,13 @@ namespace repl { virtual void summarizeAsHtml(ReplSetHtmlSummary* output) = 0; /** + * Prepares a ReplSetRequestVotesResponse. + */ + virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + const OpTime& lastAppliedOpTime) = 0; + + /** * Determines whether or not the newly elected primary is valid from our perspective. * If it is, sets the _currentPrimaryIndex and term to the received values. * If it is not, return ErrorCode::BadValue and the current term from our perspective. @@ -376,6 +384,13 @@ namespace repl { const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm) = 0; + /** + * Loads an initial LastVote document, which was read from local storage. + * + * Called only during replication startup. All other updates are done internally. + */ + virtual void loadLastVote(const LastVote& lastVote) = 0; + protected: TopologyCoordinator() {} }; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 491516b44f8..7dde549b12b 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_set_html_summary.h" +#include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/server_parameters.h" @@ -55,7 +56,7 @@ namespace repl { using std::vector; - const Seconds TopologyCoordinatorImpl::LastVote::leaseTime = Seconds(30); + const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30); namespace { @@ -124,7 +125,6 @@ namespace { TopologyCoordinatorImpl::TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs) : _role(Role::follower), - _term(0), _currentPrimaryIndex(-1), _forceSyncSourceIndex(-1), _maxSyncSourceLagSecs(maxSyncSourceLagSecs), @@ -622,17 +622,17 @@ namespace { << highestPriority->getHostAndPort().toString(); vote = -10000; } - else if (_lastVote.when.millis + LastVote::leaseTime.total_milliseconds() >= now.millis && - _lastVote.whoId != args.whoid) { + else if (_voteLease.when.millis + VoteLease::leaseTime.total_milliseconds() >= now.millis && + _voteLease.whoId != args.whoid) { log() << "replSet voting no for " << hopeful->getHostAndPort().toString() - << "; voted for " << _lastVote.whoHostAndPort.toString() << ' ' - << (now.millis - _lastVote.when.millis) / 1000 << " secs ago"; + << "; voted for " << _voteLease.whoHostAndPort.toString() << ' ' + << (now.millis - _voteLease.when.millis) / 1000 << " secs ago"; } else { - _lastVote.when = now; - _lastVote.whoId = args.whoid; - _lastVote.whoHostAndPort = hopeful->getHostAndPort(); + _voteLease.when = now; + _voteLease.whoId = args.whoid; + _voteLease.whoHostAndPort = hopeful->getHostAndPort(); vote = _selfConfig().getNumVotes(); invariant(hopeful->getId() == args.whoid); if (vote > 0) { @@ -993,7 +993,7 @@ namespace { << (latestOpTime.getSecs() - highestPriorityMemberOptime.getSecs()) << " seconds behind me"; const Date_t until = now + - LastVote::leaseTime.total_milliseconds() + + VoteLease::leaseTime.total_milliseconds() + kHeartbeatInterval.total_milliseconds(); if (_electionSleepUntil < until) { _electionSleepUntil = until; @@ -1740,9 +1740,9 @@ namespace { if (_stepDownUntil > now) { result |= StepDownPeriodActive; } - if (_lastVote.whoId != -1 && - _lastVote.whoId !=_rsConfig.getMemberAt(_selfIndex).getId() && - _lastVote.when.millis + LastVote::leaseTime.total_milliseconds() >= now.millis) { + if (_voteLease.whoId != -1 && + _voteLease.whoId !=_rsConfig.getMemberAt(_selfIndex).getId() && + _voteLease.when.millis + VoteLease::leaseTime.total_milliseconds() >= now.millis) { result |= VotedTooRecently; } @@ -1770,7 +1770,7 @@ namespace { ss << "; "; } hasWrittenToStream = true; - ss << "I recently voted for " << _lastVote.whoHostAndPort.toString(); + ss << "I recently voted for " << _voteLease.whoHostAndPort.toString(); } if (ur & CannotSeeMajority) { if (hasWrittenToStream) { @@ -1879,16 +1879,16 @@ namespace { return false; } int selfId = _selfConfig().getId(); - if ((_lastVote.when + LastVote::leaseTime.total_milliseconds() >= now) - && (_lastVote.whoId != selfId)) { + if ((_voteLease.when + VoteLease::leaseTime.total_milliseconds() >= now) + && (_voteLease.whoId != selfId)) { log() << "not voting yea for " << selfId << - " voted for " << _lastVote.whoHostAndPort.toString() << ' ' << - (now - _lastVote.when) / 1000 << " secs ago"; + " voted for " << _voteLease.whoHostAndPort.toString() << ' ' << + (now - _voteLease.when) / 1000 << " secs ago"; return false; } - _lastVote.when = now; - _lastVote.whoId = selfId; - _lastVote.whoHostAndPort = _selfConfig().getHostAndPort(); + _voteLease.when = now; + _voteLease.whoId = selfId; + _voteLease.whoHostAndPort = _selfConfig().getHostAndPort(); return true; } @@ -1933,10 +1933,10 @@ namespace { _electionId = OID(); _role = Role::follower; - // Clear lastVote time, if we voted for ourselves in this election. + // Clear voteLease time, if we voted for ourselves in this election. // This will allow us to vote for others. - if (_lastVote.whoId == _selfConfig().getId()) { - _lastVote.when = 0; + if (_voteLease.whoId == _selfConfig().getId()) { + _voteLease.when = 0; } } @@ -2104,6 +2104,42 @@ namespace { output->setSelfHeartbeatMessage(_hbmsg); } + void TopologyCoordinatorImpl::processReplSetRequestVotes( + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + const OpTime& lastAppliedOpTime) { + response->setOk(true); + response->setTerm(_term); + + if (args.getTerm() < _term) { + response->setVoteGranted(false); + response->setReason("candidate's term is lower than mine"); + } + else if (args.getConfigVersion() != _rsConfig.getConfigVersion()) { + response->setVoteGranted(false); + response->setReason("candidate's config version differs from mine"); + } + else if (args.getSetName() != _rsConfig.getReplSetName()) { + response->setVoteGranted(false); + response->setReason("candidate's set name differs from mine"); + } + // TODO(dannenberg): switch comparison to OpTimes once lastAppliedOpTime is an OpTime again + else if (args.getLastCommittedOp() < lastAppliedOpTime) { + response->setVoteGranted(false); + response->setReason("candidate's data is staler than mine"); + } + else if (_lastVote.getTerm() == args.getTerm()) { + response->setVoteGranted(false); + response->setReason("already voted for another candidate this term"); + } + else { + _lastVote.setTerm(args.getTerm()); + _lastVote.setCandidateId(args.getCandidateId()); + response->setVoteGranted(true); + } + + } + Status TopologyCoordinatorImpl::processReplSetDeclareElectionWinner( const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm) { @@ -2127,5 +2163,9 @@ namespace { return Status::OK(); } + void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) { + _lastVote = lastVote; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 69f6b31df37..39d186c7308 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -37,6 +37,7 @@ #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/repl/last_vote.h" #include "mongo/util/time_support.h" namespace mongo { @@ -190,9 +191,14 @@ namespace repl { const Timestamp& lastCommitttedOpTime) const; Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm); + virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + const OpTime& lastAppliedOpTime); virtual void summarizeAsHtml(ReplSetHtmlSummary* output); + virtual void loadLastVote(const LastVote& lastVote); + //////////////////////////////////////////////////////////// // // Test support methods @@ -335,7 +341,7 @@ namespace repl { // This node's election term. The term is used as part of the consensus algorithm to elect // and maintain one primary (leader) node in the cluster. - long long _term; + long long _term = -1; // the index of the member we currently believe is primary, if one exists, otherwise -1 int _currentPrimaryIndex; @@ -393,15 +399,18 @@ namespace repl { PingMap _pings; // Last vote info from the election - struct LastVote { + struct VoteLease { static const Seconds leaseTime; - LastVote() : when(0), whoId(-1) { } + VoteLease() : when(0), whoId(-1) { } Date_t when; int whoId; HostAndPort whoHostAndPort; - } _lastVote; + } _voteLease; + + // V1 last vote info for elections + LastVote _lastVote; }; diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index ce99ca62ee9..3d314251934 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -35,6 +35,8 @@ #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" +#include "mongo/db/repl/repl_set_declare_election_winner_args.h" +#include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/unittest/unittest.h" @@ -4098,6 +4100,132 @@ namespace { logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Log()); } + TEST_F(TopoCoordTest, ProcessRequestVotesTwoRequestsForSameTerm) { + updateConfig(BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 10 << "host" << "hself") << + BSON("_id" << 20 << "host" << "h2") << + BSON("_id" << 30 << "host" << "h3"))), + 0); + setSelfMemberState(MemberState::RS_SECONDARY); + + ReplSetRequestVotesArgs args; + args.initialize(BSON("replSetRequestVotes" << 1 + << "setName" << "rs0" + << "term" << 1LL + << "candidateId" << 10LL + << "configVersion" << 1LL + << "lastCommittedOp" << BSON ("ts" << Timestamp(10, 0) + << "term" << 0LL))); + ReplSetRequestVotesResponse response; + OpTime lastAppliedOpTime; + + getTopoCoord().processReplSetRequestVotes(args, &response, lastAppliedOpTime); + ASSERT_EQUALS("", response.getReason()); + ASSERT_TRUE(response.getVoteGranted()); + + ReplSetRequestVotesArgs args2; + args2.initialize(BSON("replSetRequestVotes" << 1 + << "setName" << "rs0" + << "term" << 1LL + << "candidateId" << 20LL + << "configVersion" << 1LL + << "lastCommittedOp" << BSON ("ts" << Timestamp(10, 0) + << "term" << 0LL))); + ReplSetRequestVotesResponse response2; + + // different candidate same term, should be a problem + getTopoCoord().processReplSetRequestVotes(args2, &response2, lastAppliedOpTime); + ASSERT_EQUALS("already voted for another candidate this term", response2.getReason()); + ASSERT_FALSE(response2.getVoteGranted()); + + } + + TEST_F(TopoCoordTest, ProcessRequestVotesBadCommands) { + updateConfig(BSON("_id" << "rs0" << + "version" << 1 << + "members" << BSON_ARRAY( + BSON("_id" << 10 << "host" << "hself") << + BSON("_id" << 20 << "host" << "h2") << + BSON("_id" << 30 << "host" << "h3"))), + 0); + setSelfMemberState(MemberState::RS_SECONDARY); + + // mismatched setName + ReplSetRequestVotesArgs args; + args.initialize(BSON("replSetRequestVotes" << 1 + << "setName" << "wrongName" + << "term" << 1LL + << "candidateId" << 10LL + << "configVersion" << 1LL + << "lastCommittedOp" << BSON ("ts" << Timestamp(10, 0) + << "term" << 0LL))); + ReplSetRequestVotesResponse response; + OpTime lastAppliedOpTime; + + getTopoCoord().processReplSetRequestVotes(args, &response, lastAppliedOpTime); + ASSERT_EQUALS("candidate's set name differs from mine", response.getReason()); + ASSERT_FALSE(response.getVoteGranted()); + + // mismatched configVersion + ReplSetRequestVotesArgs args2; + args2.initialize(BSON("replSetRequestVotes" << 1 + << "setName" << "rs0" + << "term" << 1LL + << "candidateId" << 20LL + << "configVersion" << 0LL + << "lastCommittedOp" << BSON ("ts" << Timestamp(10, 0) + << "term" << 0LL))); + ReplSetRequestVotesResponse response2; + + getTopoCoord().processReplSetRequestVotes(args2, &response2, lastAppliedOpTime); + ASSERT_EQUALS("candidate's config version differs from mine", response2.getReason()); + ASSERT_FALSE(response2.getVoteGranted()); + + // set term higher by receiving a replSetDeclareElectionWinnerCommand + ReplSetDeclareElectionWinnerArgs winnerArgs; + winnerArgs.initialize(BSON("replSetDeclareElectionWinner" << 1 + << "setName" << "rs0" + << "term" << 2 + << "winnerId" << 30)); + long long responseTerm; + ASSERT_OK(getTopoCoord().processReplSetDeclareElectionWinner(winnerArgs, &responseTerm)); + ASSERT_EQUALS(2, responseTerm); + + // stale term + ReplSetRequestVotesArgs args3; + args3.initialize(BSON("replSetRequestVotes" << 1 + << "setName" << "rs0" + << "term" << 1LL + << "candidateId" << 20LL + << "configVersion" << 1LL + << "lastCommittedOp" << BSON ("ts" << Timestamp(10, 0) + << "term" << 0LL))); + ReplSetRequestVotesResponse response3; + + getTopoCoord().processReplSetRequestVotes(args3, &response3, lastAppliedOpTime); + ASSERT_EQUALS("candidate's term is lower than mine", response3.getReason()); + ASSERT_EQUALS(2, response3.getTerm()); + ASSERT_FALSE(response3.getVoteGranted()); + + // stale OpTime + ReplSetRequestVotesArgs args4; + args4.initialize(BSON("replSetRequestVotes" << 1 + << "setName" << "rs0" + << "term" << 3LL + << "candidateId" << 20LL + << "configVersion" << 1LL + << "lastCommittedOp" << BSON ("ts" << Timestamp(10, 0) + << "term" << 0LL))); + ReplSetRequestVotesResponse response4; + OpTime lastAppliedOpTime2 = {Timestamp(20, 0), 0}; + + getTopoCoord().processReplSetRequestVotes(args4, &response4, lastAppliedOpTime2); + ASSERT_EQUALS("candidate's data is staler than mine", response4.getReason()); + ASSERT_FALSE(response4.getVoteGranted()); + } + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp new file mode 100644 index 00000000000..ecabc80fbd5 --- /dev/null +++ b/src/mongo/db/repl/vote_requester.cpp @@ -0,0 +1,156 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/vote_requester.h" + +#include "mongo/base/status.h" +#include "mongo/db/repl/repl_set_request_votes_args.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + + VoteRequester::Algorithm::Algorithm(const ReplicaSetConfig& rsConfig, + long long candidateId, + long long term, + OpTime lastOplogEntry) : + _rsConfig(rsConfig), + _candidateId(candidateId), + _term(term), + _lastOplogEntry(lastOplogEntry) { + + // populate targets with all voting members that aren't this node + for (auto member = _rsConfig.membersBegin(); member != _rsConfig.membersEnd(); member++) { + if (member->isVoter() && member->getId() != candidateId) { + _targets.push_back(member->getHostAndPort()); + } + } + } + + VoteRequester::Algorithm::~Algorithm() {} + + std::vector<ReplicationExecutor::RemoteCommandRequest> + VoteRequester::Algorithm::getRequests() const { + BSONObjBuilder requestVotesCmdBuilder; + requestVotesCmdBuilder.append("replSetDeclareElectionWinner", 1); + requestVotesCmdBuilder.append("setName", _rsConfig.getReplSetName()); + requestVotesCmdBuilder.append("term", _term); + requestVotesCmdBuilder.append("candidateId", _candidateId); + requestVotesCmdBuilder.append("configVersion", _rsConfig.getConfigVersion()); + + BSONObjBuilder lastCommittedOp(requestVotesCmdBuilder.subobjStart("lastCommittedOp")); + lastCommittedOp.append("ts", _lastOplogEntry.getTimestamp()); + lastCommittedOp.append("term", _lastOplogEntry.getTerm()); + lastCommittedOp.done(); + + const BSONObj requestVotesCmd = requestVotesCmdBuilder.obj(); + + std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + for (const auto& target : _targets) { + requests.push_back(ReplicationExecutor::RemoteCommandRequest( + target, + "admin", + requestVotesCmd, + Milliseconds(30*1000))); // trying to match current Socket timeout + } + + return requests; + } + + void VoteRequester::Algorithm::processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response) { + _responsesProcessed++; + if (!response.isOK()) { // failed response + log() << "VoteRequester: Got failed response from " << request.target + << ": " << response.getStatus(); + } + else { + ReplSetRequestVotesResponse voteResponse; + voteResponse.initialize(response.getValue().data); + if (voteResponse.getVoteGranted()) { + _votes++; + } + else { + log() << "VoteRequester: Got no vote from " << request.target + << " because: " << voteResponse.getReason(); + } + + if (voteResponse.getTerm() > _term) { + _failed = true; + _status = {ErrorCodes::BadValue, + "running for a stale term"}; + } + } + + if (_responsesProcessed == static_cast<int>(_targets.size()) && // all responses obtained + !hasReceivedSufficientResponses()) { + _failed = true; + _status = {ErrorCodes::IllegalOperation, + "received insufficient votes"}; + } + } + + bool VoteRequester::Algorithm::hasReceivedSufficientResponses() const { + return _failed || _votes == _rsConfig.getMajorityVoteCount(); + } + + VoteRequester::VoteRequester() : _isCanceled(false) {} + VoteRequester::~VoteRequester() {} + + StatusWith<ReplicationExecutor::EventHandle> VoteRequester::start( + ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + long long candidateId, + long long term, + OpTime lastOplogEntry, + const stdx::function<void ()>& onCompletion) { + + _algorithm.reset(new Algorithm(rsConfig, + candidateId, + term, + lastOplogEntry)); + _runner.reset(new ScatterGatherRunner(_algorithm.get())); + return _runner->start(executor, onCompletion); + } + + void VoteRequester::cancel(ReplicationExecutor* executor) { + _isCanceled = true; + _runner->cancel(executor); + } + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h new file mode 100644 index 00000000000..72c4f90c3f7 --- /dev/null +++ b/src/mongo/db/repl/vote_requester.h @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_algorithm.h" +#include "mongo/stdx/functional.h" + +namespace mongo { + + class Status; + +namespace repl { + + class ScatterGatherRunner; + class ReplSetDeclareRequestVotesArgs; + + class VoteRequester { + MONGO_DISALLOW_COPYING(VoteRequester); + public: + + class Algorithm : public ScatterGatherAlgorithm { + public: + Algorithm(const ReplicaSetConfig& rsConfig, + long long candidateId, + long long term, + OpTime lastOplogEntry); + virtual ~Algorithm(); + virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual void processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response); + virtual bool hasReceivedSufficientResponses() const; + + /** + * Returns BadValue if the term for which we are running is statel, IllegalOperation if + * insufficient votes are received, and Status::OK if we've won the election. + * + * It is invalid to call this before hasReceivedSufficeintResponses returns true. + */ + Status getStatus() const { return _status; } + + private: + const ReplicaSetConfig _rsConfig; + const long long _candidateId; + const long long _term; + const OpTime _lastOplogEntry; + std::vector<HostAndPort> _targets; + bool _failed = false; + long long _responsesProcessed = 0; + long long _votes = 1; + Status _status = Status::OK(); + }; + + VoteRequester(); + virtual ~VoteRequester(); + + /** + * Begins the process of sending replSetRequestVotes commands to all non-DOWN nodes + * in currentConfig, in attempt to receive sufficient votes to win the election. + * + * evh can be used to schedule a callback when the process is complete. + * This function must be run in the executor, as it must be synchronous with the command + * callbacks that it schedules. + * If this function returns Status::OK(), evh is then guaranteed to be signaled. + **/ + StatusWith<ReplicationExecutor::EventHandle> start( + ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + long long candidateId, + long long term, + OpTime lastOplogEntry, + const stdx::function<void ()>& onCompletion = stdx::function<void ()>()); + + /** + * Informs the VoteRequester to cancel further processing. The "executor" + * argument must point to the same executor passed to "start()". + * + * Like start(), this method must run in the executor context. + */ + void cancel(ReplicationExecutor* executor); + + private: + std::unique_ptr<Algorithm> _algorithm; + std::unique_ptr<ScatterGatherRunner> _runner; + bool _isCanceled = false; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp new file mode 100644 index 00000000000..5a36015f8c6 --- /dev/null +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -0,0 +1,270 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/vote_requester.h" +#include "mongo/db/repl/network_interface_mock.h" +#include "mongo/db/repl/repl_set_request_votes_args.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/stdx/functional.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { +namespace { + using unittest::assertGet; + + using RemoteCommandRequest = ReplicationExecutor::RemoteCommandRequest; + + bool stringContains(const std::string &haystack, const std::string& needle) { + return haystack.find(needle) != std::string::npos; + } + + + class VoteRequesterTest : public mongo::unittest::Test { + public: + virtual void setUp() { + ReplicaSetConfig config; + ASSERT_OK(config.initialize( + BSON("_id" << "rs0" << + "version" << 2 << + "members" << BSON_ARRAY( + BSON("_id" << 0 << "host" << "host0") << + BSON("_id" << 1 << "host" << "host1") << + BSON("_id" << 2 << "host" << "host2") << + BSON("_id" << 3 << "host" << "host3" << "votes" << 0) << + BSON("_id" << 4 << "host" << "host4" << "votes" << 0))))); + ASSERT_OK(config.validate()); + long long candidateId = 0; + long long term = 2; + OpTime lastOplogEntry = OpTime(Timestamp(999,0), 1); + + _requester.reset(new VoteRequester::Algorithm(config, + candidateId, + term, + lastOplogEntry)); + } + + virtual void tearDown() { + _requester.reset(NULL); + } + + protected: + int64_t countLogLinesContaining(const std::string& needle) { + return std::count_if(getCapturedLogMessages().begin(), + getCapturedLogMessages().end(), + stdx::bind(stringContains, + stdx::placeholders::_1, + needle)); + } + + bool hasReceivedSufficientResponses() { + return _requester->hasReceivedSufficientResponses(); + } + + void processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) { + _requester->processResponse(request, response); + } + + Status getStatus() { + return _requester->getStatus(); + } + + RemoteCommandRequest requestFrom(std::string hostname) { + return RemoteCommandRequest(HostAndPort(hostname), + "", // fields do not matter in VoteRequester + BSONObj(), + Milliseconds(0)); + } + + ResponseStatus badResponseStatus() { + return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch"); + } + + ResponseStatus votedYes() { + ReplSetRequestVotesResponse response; + response.setOk(true); + response.setVoteGranted(true); + response.setTerm(1); + return ResponseStatus(NetworkInterfaceMock::Response(response.toBSON(), + Milliseconds(10))); + } + + ResponseStatus votedNoBecauseConfigVersionDoesNotMatch() { + ReplSetRequestVotesResponse response; + response.setOk(true); + response.setVoteGranted(false); + response.setTerm(1); + response.setReason("candidate's config version differs from mine"); + return ResponseStatus(NetworkInterfaceMock::Response(response.toBSON(), + Milliseconds(10))); + } + + ResponseStatus votedNoBecauseSetNameDiffers() { + ReplSetRequestVotesResponse response; + response.setOk(true); + response.setVoteGranted(false); + response.setTerm(1); + response.setReason("candidate's set name differs from mine"); + return ResponseStatus(NetworkInterfaceMock::Response(response.toBSON(), + Milliseconds(10))); + } + + ResponseStatus votedNoBecauseLastOpTimeIsGreater() { + ReplSetRequestVotesResponse response; + response.setOk(true); + response.setVoteGranted(false); + response.setTerm(1); + response.setReason("candidate's data is staler than mine"); + return ResponseStatus(NetworkInterfaceMock::Response(response.toBSON(), + Milliseconds(10))); + } + + ResponseStatus votedNoBecauseTermIsGreater() { + ReplSetRequestVotesResponse response; + response.setOk(true); + response.setVoteGranted(false); + response.setTerm(3); + response.setReason("candidate's term is lower than mine"); + return ResponseStatus(NetworkInterfaceMock::Response(response.toBSON(), + Milliseconds(10))); + } + + ResponseStatus votedNoBecauseAlreadyVoted() { + ReplSetRequestVotesResponse response; + response.setOk(true); + response.setVoteGranted(false); + response.setTerm(2); + response.setReason("already voted for another candidate this term"); + return ResponseStatus(NetworkInterfaceMock::Response(response.toBSON(), + Milliseconds(10))); + } + + private: + std::unique_ptr<VoteRequester::Algorithm> _requester; + }; + + TEST_F(VoteRequesterTest, ImmediateGoodResponseWinElection) { + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_OK(getStatus()); + } + + TEST_F(VoteRequesterTest, BadConfigVersionWinElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedNoBecauseConfigVersionDoesNotMatch()); + ASSERT_FALSE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(1, countLogLinesContaining("Got no vote from host1")); + processResponse(requestFrom("host2"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_OK(getStatus()); + stopCapturingLogMessages(); + } + + TEST_F(VoteRequesterTest, SetNameDiffersWinElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedNoBecauseSetNameDiffers()); + ASSERT_FALSE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(1, countLogLinesContaining("Got no vote from host1")); + processResponse(requestFrom("host2"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_OK(getStatus()); + stopCapturingLogMessages(); + } + + TEST_F(VoteRequesterTest, LastOpTimeIsGreaterWinElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedNoBecauseLastOpTimeIsGreater()); + ASSERT_FALSE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(1, countLogLinesContaining("Got no vote from host1")); + processResponse(requestFrom("host2"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_OK(getStatus()); + stopCapturingLogMessages(); + } + + TEST_F(VoteRequesterTest, FailedToContactWinElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), badResponseStatus()); + ASSERT_FALSE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(1, countLogLinesContaining("Got failed response from host1")); + processResponse(requestFrom("host2"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_OK(getStatus()); + stopCapturingLogMessages(); + } + + TEST_F(VoteRequesterTest, AlreadyVotedWinElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedNoBecauseAlreadyVoted()); + ASSERT_FALSE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(1, countLogLinesContaining("Got no vote from host1")); + processResponse(requestFrom("host2"), votedYes()); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_OK(getStatus()); + stopCapturingLogMessages(); + } + + TEST_F(VoteRequesterTest, StaleTermLoseElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedNoBecauseTermIsGreater()); + ASSERT_EQUALS(1, countLogLinesContaining("Got no vote from host1")); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(getStatus().reason(), "running for a stale term"); + stopCapturingLogMessages(); + } + + TEST_F(VoteRequesterTest, NotEnoughVotesLoseElection) { + startCapturingLogMessages(); + ASSERT_FALSE(hasReceivedSufficientResponses()); + processResponse(requestFrom("host1"), votedNoBecauseSetNameDiffers()); + ASSERT_FALSE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(1, countLogLinesContaining("Got no vote from host1")); + processResponse(requestFrom("host2"), badResponseStatus()); + ASSERT_EQUALS(1, countLogLinesContaining("Got failed response from host2")); + ASSERT_TRUE(hasReceivedSufficientResponses()); + ASSERT_EQUALS(getStatus().reason(), "received insufficient votes"); + stopCapturingLogMessages(); + } + +} // namespace +} // namespace repl +} // namespace mongo |