diff options
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 120 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 267 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 188 |
6 files changed, 578 insertions, 5 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 0178094175a..d8ceeb50ff0 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -585,6 +585,7 @@ serverOnlyFiles = [ "db/curop.cpp", "db/repl/repl_reads_ok.cpp", "db/repl/resync.cpp", "db/repl/oplog.cpp", + "db/repl/topology_coordinator_impl.cpp", "db/prefetch.cpp", "db/repl/write_concern.cpp", "db/index_legacy.cpp", diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h index 41ceddfb081..ad7841edcb7 100644 --- a/src/mongo/db/repl/rs.h +++ b/src/mongo/db/repl/rs.h @@ -67,7 +67,6 @@ namespace replset { extern bool replSet; // true if using repl sets extern class ReplSet *theReplSet; // null until initialized - extern int maxSyncSourceLagSecs; class ReplSetCmdline; diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h index f5df70b1b85..9bb87b59626 100644 --- a/src/mongo/db/repl/rs_sync.h +++ b/src/mongo/db/repl/rs_sync.h @@ -41,12 +41,10 @@ namespace mongo { namespace replset { - - class BackgroundSyncInterface; - - // TODO: move hbmsg into an error-keeping class (SERVER-4444) void sethbmsg(const std::string& s, const int logLevel=0); + extern int maxSyncSourceLagSecs; + } // namespace replset } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h new file mode 100644 index 00000000000..c384b27b8d5 --- /dev/null +++ b/src/mongo/db/repl/topology_coordinator.h @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { + + class OpTime; + +namespace replset { + + class HeartbeatInfo; + class Member; + struct MemberState; + + typedef int Callback_t; // TBD + + + /** + * Replication Topology Coordinator interface. + * + * This object is responsible for managing the topology of the cluster. + * Tasks include consensus and leader election, chaining, and configuration management. + * Methods of this class should be non-blocking. + */ + + + class TopologyCoordinator { + MONGO_DISALLOW_COPYING(TopologyCoordinator); + public: + virtual ~TopologyCoordinator() {} + + // The optime of the last op actually applied to the data + virtual void setLastApplied(const OpTime& optime) = 0; + // The optime of the last op marked as committed by the leader + virtual void setCommitOkayThrough(const OpTime& optime) = 0; + // The optime of the last op received over the network from the sync source + virtual void setLastReceived(const OpTime& optime) = 0; + + // The amount of time this node delays applying ops when acting as a secondary + virtual int getSelfSlaveDelay() const = 0; + // Flag determining if chaining secondaries is allowed + virtual bool getChainingAllowedFlag() const = 0; + + // For use with w:majority write concern + virtual int getMajorityNumber() const = 0; + + // ReplCoord needs to know the state to implement certain public functions + virtual MemberState getMemberState() const = 0; + + // Looks up _syncSource's address and returns it, for use by the Applier + virtual HostAndPort getSyncSourceAddress() const = 0; + // Chooses and sets a new sync source, based on our current knowledge of the world + virtual void chooseNewSyncSource() = 0; // this is basically getMemberToSyncTo() + // Do not choose a member as a sync source for a while; + // call this when we have reason to believe it's a bad choice (do we need this?) + // (currently handled by _veto in rs_initialsync) + virtual void blacklistSyncSource(Member* member) = 0; + + // Add function pointer to callback list; call function when config changes + // Applier needs to know when things like chainingAllowed or slaveDelay change. + // ReplCoord needs to know when things like the tag sets change. + virtual void registerConfigChangeCallback(Callback_t) = 0; + + // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY + virtual void signalDrainComplete() = 0; + + // election entry point + virtual void electSelf() = 0; + + // produce a reply to a RAFT-style RequestVote RPC + virtual bool prepareRequestVoteResponse(const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) = 0; + + // produce a reply to a received electCmd + virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result) = 0; + + // produce a reply to a heartbeat + virtual bool prepareHeartbeatResponse(const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) = 0; + + // update internal state with heartbeat response + virtual void updateHeartbeatInfo(const HeartbeatInfo& newInfo) = 0; + protected: + TopologyCoordinator() {} + }; +} // namespace replset +} // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp new file mode 100644 index 00000000000..4bbef351245 --- /dev/null +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -0,0 +1,267 @@ +/** + * Copyright 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/topology_coordinator_impl.h" + +#include "mongo/db/repl/member.h" +#include "mongo/db/repl/rs_sync.h" // maxSyncSourceLagSecs + +namespace mongo { +namespace replset { + + TopologyCoordinatorImpl::TopologyCoordinatorImpl() : + _majorityNumber(0) { + }; + + void TopologyCoordinatorImpl::setLastApplied(const OpTime& optime) { + _lastApplied = optime; + } + + void TopologyCoordinatorImpl::setCommitOkayThrough(const OpTime& optime) { + _commitOkayThrough = optime; + } + + void TopologyCoordinatorImpl::setLastReceived(const OpTime& optime) { + _lastReceived = optime; + } + + int TopologyCoordinatorImpl::getSelfSlaveDelay() const { + invariant(_currentConfig.self); + return _currentConfig.self->slaveDelay; + } + + bool TopologyCoordinatorImpl::getChainingAllowedFlag() const { + return _currentConfig.chainingAllowed; + } + + int TopologyCoordinatorImpl::getMajorityNumber() const { + return _majorityNumber; + }; + + MemberState TopologyCoordinatorImpl::getMemberState() const { + // TODO + return MemberState(); + } + + void TopologyCoordinatorImpl::_calculateMajorityNumber() { + int total = _currentConfig.members.size(); + int nonArbiters = total; + int strictMajority = total/2+1; + + for (std::vector<MemberConfig>::iterator it = _currentConfig.members.begin(); + it < _currentConfig.members.end(); + it++) { + if ((*it).arbiterOnly) { + nonArbiters--; + } + } + + // majority should be all "normal" members if we have something like 4 + // arbiters & 3 normal members + _majorityNumber = (strictMajority > nonArbiters) ? nonArbiters : strictMajority; + + } + + HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { + return _syncSource->h(); + } + + void TopologyCoordinatorImpl::chooseNewSyncSource() { +// const Member* ReplSetImpl::getMemberToSyncTo() { +// lock lk(this); + + // if we have a target we've requested to sync from, use it + +/* + This should be a HostAndPort. +*/ +// TODO +/* + if (_forceSyncTarget) { + Member* target = _forceSyncTarget; + _forceSyncTarget = 0; + sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0); + return target; + } +*/ + + // wait for 2N pings before choosing a sync target + int needMorePings = _currentConfig.members.size()*2 - HeartbeatInfo::numPings; + + if (needMorePings > 0) { + OCCASIONALLY log() << "waiting for " << needMorePings + << " pings from other members before syncing"; + return; + } + + // If we are only allowed to sync from the primary, set that + if (!_currentConfig.chainingAllowed) { + // Sets NULL if we cannot reach the primary + _syncSource = _currentPrimary; + } + + // find the member with the lowest ping time that has more data than me + + // Find primary's oplog time. Reject sync candidates that are more than + // maxSyncSourceLagSecs seconds behind. + OpTime primaryOpTime; + if (_currentPrimary) + primaryOpTime = _currentPrimary->hbinfo().opTime; + else + // choose a time that will exclude no candidates, since we don't see a primary + primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); + + if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) { + // erh - I think this means there was just a new election + // and we don't yet know the new primary's optime + primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); + } + + OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0); + + Member *closest = 0; + time_t now = 0; + + // Make two attempts. The first attempt, we ignore those nodes with + // slave delay higher than our own. The second attempt includes such + // nodes, in case those are the only ones we can reach. + // This loop attempts to set 'closest'. + for (int attempts = 0; attempts < 2; ++attempts) { + for (Member *m = _otherMembers.head(); m; m = m->next()) { + if (!m->syncable()) + continue; + + if (m->state() == MemberState::RS_SECONDARY) { + // only consider secondaries that are ahead of where we are + if (m->hbinfo().opTime <= _lastApplied) + continue; + // omit secondaries that are excessively behind, on the first attempt at least. + if (attempts == 0 && + m->hbinfo().opTime < oldestSyncOpTime) + continue; + } + + // omit nodes that are more latent than anything we've already considered + if (closest && + (m->hbinfo().ping > closest->hbinfo().ping)) + continue; + + if (attempts == 0 && + (_currentConfig.self->slaveDelay < m->config().slaveDelay + || m->config().hidden)) { + continue; // skip this one in the first attempt + } + + map<string,time_t>::iterator vetoed = _syncSourceBlacklist.find(m->fullName()); + if (vetoed != _syncSourceBlacklist.end()) { + // Do some veto housekeeping + if (now == 0) { + now = time(0); + } + + // if this was on the veto list, check if it was vetoed in the last "while". + // if it was, skip. + if (vetoed->second >= now) { + if (time(0) % 5 == 0) { + log() << "replSet not trying to sync from " << (*vetoed).first + << ", it is vetoed for " << ((*vetoed).second - now) + << " more seconds" << rsLog; + } + continue; + } + _syncSourceBlacklist.erase(vetoed); + // fall through, this is a valid candidate now + } + // This candidate has passed all tests; set 'closest' + closest = m; + } + if (closest) break; // no need for second attempt + } + + if (!closest) { + return; + } + + sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); + _syncSource = closest; + } + + void TopologyCoordinatorImpl::blacklistSyncSource(Member* member) { + // TODO + + } + + void TopologyCoordinatorImpl::registerConfigChangeCallback(Callback_t) { + // TODO + + } + + // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY + void TopologyCoordinatorImpl::signalDrainComplete() { + // TODO + + } + + // election entry point + void TopologyCoordinatorImpl::electSelf() { + // TODO + + } + + // produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command + bool TopologyCoordinatorImpl::prepareRequestVoteResponse(const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) { + // TODO + return false; + } + + // produce a reply to a recevied electCmd + void TopologyCoordinatorImpl::prepareElectCmdResponse(const BSONObj& cmdObj, + BSONObjBuilder& result) { + // TODO + + } + + // produce a reply to a heartbeat + bool TopologyCoordinatorImpl::prepareHeartbeatResponse(const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result) { + // TODO + return false; + } + + // update internal state with heartbeat response + void TopologyCoordinatorImpl::updateHeartbeatInfo(const HeartbeatInfo& newInfo) { + // TODO + + } + + +} // namespace replset +} // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h new file mode 100644 index 00000000000..0acd55d8ad9 --- /dev/null +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/db/repl/topology_coordinator.h" +#include "mongo/bson/optime.h" +#include "mongo/db/repl/member_state.h" +#include "mongo/util/concurrency/list.h" + +namespace mongo { +namespace replset { + + class TagSubgroup; + + class TopologyCoordinatorImpl : public TopologyCoordinator { + public: + TopologyCoordinatorImpl(); + virtual ~TopologyCoordinatorImpl() {}; + + virtual void setLastApplied(const OpTime& optime); + virtual void setCommitOkayThrough(const OpTime& optime); + virtual void setLastReceived(const OpTime& optime); + + virtual int getSelfSlaveDelay() const; + virtual bool getChainingAllowedFlag() const; + + // For use with w:majority write concern + virtual int getMajorityNumber() const; + + // ReplCoord needs to know the state to implement certain public functions + virtual MemberState getMemberState() const; + + // Looks up _syncSource's address and returns it, for use by the Applier + virtual HostAndPort getSyncSourceAddress() const; + // Chooses and sets a new sync source, based on our current knowledge of the world + virtual void chooseNewSyncSource(); // this is basically getMemberToSyncTo() + // Do not choose a member as a sync source for a while; + // call this when we have reason to believe it's a bad choice (do we need this?) + // (currently handled by _veto in rs_initialsync) + virtual void blacklistSyncSource(Member* member); + + // Add function pointer to callback list; call function when config changes + // Applier needs to know when things like chainingAllowed or slaveDelay change. + // ReplCoord needs to know when things like the tag sets change. + virtual void registerConfigChangeCallback(Callback_t); + + // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY + virtual void signalDrainComplete(); + + // election entry point + virtual void electSelf(); + + // produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command + virtual bool prepareRequestVoteResponse(const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result); + + // produce a reply to a received electCmd + virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result); + + // produce a reply to a heartbeat + virtual bool prepareHeartbeatResponse(const BSONObj& cmdObj, + std::string& errmsg, + BSONObjBuilder& result); + + // update internal state with heartbeat response + virtual void updateHeartbeatInfo(const HeartbeatInfo& newInfo); + + private: + + void _calculateMajorityNumber(); // possibly not needed + + OpTime _lastApplied; // the last op that the applier has actually written to the data + OpTime _commitOkayThrough; // the primary's latest op that won't get rolled back + OpTime _lastReceived; // the last op we have received from our sync source + + + MemberState _memberState; + int _majorityNumber; // for w:majority writes + + // the member we currently believe is primary, if one exists + const Member *_currentPrimary; + // the member we are currently syncing from + // NULL if no sync source (we are primary, or we cannot connect to anyone yet) + const Member* _syncSource; + // These members are not chosen as sync sources for a period of time, due to connection + // issues with them + std::map<std::string, time_t> _syncSourceBlacklist; + + + class MemberConfig { + public: + MemberConfig() : + _id(-1), + votes(1), + priority(1.0), + arbiterOnly(false), + slaveDelay(0), + hidden(false), + buildIndexes(true) { } + int _id; /* ordinal */ + unsigned votes; /* how many votes this node gets. default 1. */ + HostAndPort h; + double priority; /* 0 means can never be primary */ + bool arbiterOnly; + int slaveDelay; /* seconds. */ + bool hidden; /* if set, don't advertise to drivers in isMaster. */ + /* for non-primaries (priority 0) */ + bool buildIndexes; /* if false, do not create any non-_id indexes */ + std::map<std::string,std::string> tags; /* tagging for data center, rack, etc. */ + private: + std::set<TagSubgroup*> _groups; // the subgroups this member belongs to + }; + + struct ReplicaSetConfig { + std::vector<MemberConfig> members; + std::string replSetName; + int version; + MemberConfig* self; + + /** + * If replication can be chained. If chaining is disallowed, it can still be explicitly + * enabled via the replSetSyncFrom command, but it will not happen automatically. + */ + bool chainingAllowed; + + } _currentConfig; + + Member* _self; + List1<Member> _otherMembers; // all members of the set EXCEPT _self. + + // do these need settors? the current code has no way to change these values. + struct HeartbeatOptions { + HeartbeatOptions() : heartbeatSleepMillis(2000), + heartbeatTimeoutMillis(10000), + heartbeatConnRetries(2) + { } + + unsigned heartbeatSleepMillis; + unsigned heartbeatTimeoutMillis; + unsigned heartbeatConnRetries ; + + void check() { + uassert(17490, "bad replset heartbeat option", heartbeatSleepMillis >= 10); + uassert(17491, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10); + } + + bool operator==(const HeartbeatOptions& r) const { + return (heartbeatSleepMillis==r.heartbeatSleepMillis && + heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && + heartbeatConnRetries==r.heartbeatConnRetries); + } + } _heartbeatOptions; + + + }; + +} // namespace replset +} // namespace mongo |