path: root/src/mongo/db
diff options
Diffstat (limited to 'src/mongo/db')
5 files changed, 577 insertions, 5 deletions
diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h
index 41ceddfb081..ad7841edcb7 100644
--- a/src/mongo/db/repl/rs.h
+++ b/src/mongo/db/repl/rs.h
@@ -67,7 +67,6 @@ namespace replset {
extern bool replSet; // true if using repl sets
extern class ReplSet *theReplSet; // null until initialized
- extern int maxSyncSourceLagSecs;
class ReplSetCmdline;
diff --git a/src/mongo/db/repl/rs_sync.h b/src/mongo/db/repl/rs_sync.h
index f5df70b1b85..9bb87b59626 100644
--- a/src/mongo/db/repl/rs_sync.h
+++ b/src/mongo/db/repl/rs_sync.h
@@ -41,12 +41,10 @@
namespace mongo {
namespace replset {
- class BackgroundSyncInterface;
// TODO: move hbmsg into an error-keeping class (SERVER-4444)
void sethbmsg(const std::string& s, const int logLevel=0);
+ extern int maxSyncSourceLagSecs;
} // namespace replset
} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
new file mode 100644
index 00000000000..c384b27b8d5
--- /dev/null
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -0,0 +1,120 @@
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <string>
+#include "mongo/base/disallow_copying.h"
+#include "mongo/util/net/hostandport.h"
+namespace mongo {
+ class OpTime;
+namespace replset {
+ class HeartbeatInfo;
+ class Member;
+ struct MemberState;
+ typedef int Callback_t; // TBD
+ /**
+ * Replication Topology Coordinator interface.
+ *
+ * This object is responsible for managing the topology of the cluster.
+ * Tasks include consensus and leader election, chaining, and configuration management.
+ * Methods of this class should be non-blocking.
+ */
+ class TopologyCoordinator {
+ MONGO_DISALLOW_COPYING(TopologyCoordinator);
+ public:
+ virtual ~TopologyCoordinator() {}
+ // The optime of the last op actually applied to the data
+ virtual void setLastApplied(const OpTime& optime) = 0;
+ // The optime of the last op marked as committed by the leader
+ virtual void setCommitOkayThrough(const OpTime& optime) = 0;
+ // The optime of the last op received over the network from the sync source
+ virtual void setLastReceived(const OpTime& optime) = 0;
+ // The amount of time this node delays applying ops when acting as a secondary
+ virtual int getSelfSlaveDelay() const = 0;
+ // Flag determining if chaining secondaries is allowed
+ virtual bool getChainingAllowedFlag() const = 0;
+ // For use with w:majority write concern
+ virtual int getMajorityNumber() const = 0;
+ // ReplCoord needs to know the state to implement certain public functions
+ virtual MemberState getMemberState() const = 0;
+ // Looks up _syncSource's address and returns it, for use by the Applier
+ virtual HostAndPort getSyncSourceAddress() const = 0;
+ // Chooses and sets a new sync source, based on our current knowledge of the world
+ virtual void chooseNewSyncSource() = 0; // this is basically getMemberToSyncTo()
+ // Do not choose a member as a sync source for a while;
+ // call this when we have reason to believe it's a bad choice (do we need this?)
+ // (currently handled by _veto in rs_initialsync)
+ virtual void blacklistSyncSource(Member* member) = 0;
+ // Add function pointer to callback list; call function when config changes
+ // Applier needs to know when things like chainingAllowed or slaveDelay change.
+ // ReplCoord needs to know when things like the tag sets change.
+ virtual void registerConfigChangeCallback(Callback_t) = 0;
+ // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY
+ virtual void signalDrainComplete() = 0;
+ // election entry point
+ virtual void electSelf() = 0;
+ // produce a reply to a RAFT-style RequestVote RPC
+ virtual bool prepareRequestVoteResponse(const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result) = 0;
+ // produce a reply to a received electCmd
+ virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result) = 0;
+ // produce a reply to a heartbeat
+ virtual bool prepareHeartbeatResponse(const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result) = 0;
+ // update internal state with heartbeat response
+ virtual void updateHeartbeatInfo(const HeartbeatInfo& newInfo) = 0;
+ protected:
+ TopologyCoordinator() {}
+ };
+} // namespace replset
+} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
new file mode 100644
index 00000000000..4bbef351245
--- /dev/null
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -0,0 +1,267 @@
+ * Copyright 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/member.h"
+#include "mongo/db/repl/rs_sync.h" // maxSyncSourceLagSecs
+namespace mongo {
+namespace replset {
+ TopologyCoordinatorImpl::TopologyCoordinatorImpl() :
+ _majorityNumber(0) {
+ };
+ void TopologyCoordinatorImpl::setLastApplied(const OpTime& optime) {
+ _lastApplied = optime;
+ }
+ void TopologyCoordinatorImpl::setCommitOkayThrough(const OpTime& optime) {
+ _commitOkayThrough = optime;
+ }
+ void TopologyCoordinatorImpl::setLastReceived(const OpTime& optime) {
+ _lastReceived = optime;
+ }
+ int TopologyCoordinatorImpl::getSelfSlaveDelay() const {
+ invariant(_currentConfig.self);
+ return _currentConfig.self->slaveDelay;
+ }
+ bool TopologyCoordinatorImpl::getChainingAllowedFlag() const {
+ return _currentConfig.chainingAllowed;
+ }
+ int TopologyCoordinatorImpl::getMajorityNumber() const {
+ return _majorityNumber;
+ };
+ MemberState TopologyCoordinatorImpl::getMemberState() const {
+ // TODO
+ return MemberState();
+ }
+ void TopologyCoordinatorImpl::_calculateMajorityNumber() {
+ int total = _currentConfig.members.size();
+ int nonArbiters = total;
+ int strictMajority = total/2+1;
+ for (std::vector<MemberConfig>::iterator it = _currentConfig.members.begin();
+ it < _currentConfig.members.end();
+ it++) {
+ if ((*it).arbiterOnly) {
+ nonArbiters--;
+ }
+ }
+ // majority should be all "normal" members if we have something like 4
+ // arbiters & 3 normal members
+ _majorityNumber = (strictMajority > nonArbiters) ? nonArbiters : strictMajority;
+ }
+ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
+ return _syncSource->h();
+ }
+ void TopologyCoordinatorImpl::chooseNewSyncSource() {
+// const Member* ReplSetImpl::getMemberToSyncTo() {
+// lock lk(this);
+ // if we have a target we've requested to sync from, use it
+ This should be a HostAndPort.
+// TODO
+ if (_forceSyncTarget) {
+ Member* target = _forceSyncTarget;
+ _forceSyncTarget = 0;
+ sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0);
+ return target;
+ }
+ // wait for 2N pings before choosing a sync target
+ int needMorePings = _currentConfig.members.size()*2 - HeartbeatInfo::numPings;
+ if (needMorePings > 0) {
+ OCCASIONALLY log() << "waiting for " << needMorePings
+ << " pings from other members before syncing";
+ return;
+ }
+ // If we are only allowed to sync from the primary, set that
+ if (!_currentConfig.chainingAllowed) {
+ // Sets NULL if we cannot reach the primary
+ _syncSource = _currentPrimary;
+ }
+ // find the member with the lowest ping time that has more data than me
+ // Find primary's oplog time. Reject sync candidates that are more than
+ // maxSyncSourceLagSecs seconds behind.
+ OpTime primaryOpTime;
+ if (_currentPrimary)
+ primaryOpTime = _currentPrimary->hbinfo().opTime;
+ else
+ // choose a time that will exclude no candidates, since we don't see a primary
+ primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
+ if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) {
+ // erh - I think this means there was just a new election
+ // and we don't yet know the new primary's optime
+ primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
+ }
+ OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0);
+ Member *closest = 0;
+ time_t now = 0;
+ // Make two attempts. The first attempt, we ignore those nodes with
+ // slave delay higher than our own. The second attempt includes such
+ // nodes, in case those are the only ones we can reach.
+ // This loop attempts to set 'closest'.
+ for (int attempts = 0; attempts < 2; ++attempts) {
+ for (Member *m = _otherMembers.head(); m; m = m->next()) {
+ if (!m->syncable())
+ continue;
+ if (m->state() == MemberState::RS_SECONDARY) {
+ // only consider secondaries that are ahead of where we are
+ if (m->hbinfo().opTime <= _lastApplied)
+ continue;
+ // omit secondaries that are excessively behind, on the first attempt at least.
+ if (attempts == 0 &&
+ m->hbinfo().opTime < oldestSyncOpTime)
+ continue;
+ }
+ // omit nodes that are more latent than anything we've already considered
+ if (closest &&
+ (m->hbinfo().ping > closest->hbinfo().ping))
+ continue;
+ if (attempts == 0 &&
+ (_currentConfig.self->slaveDelay < m->config().slaveDelay
+ || m->config().hidden)) {
+ continue; // skip this one in the first attempt
+ }
+ map<string,time_t>::iterator vetoed = _syncSourceBlacklist.find(m->fullName());
+ if (vetoed != _syncSourceBlacklist.end()) {
+ // Do some veto housekeeping
+ if (now == 0) {
+ now = time(0);
+ }
+ // if this was on the veto list, check if it was vetoed in the last "while".
+ // if it was, skip.
+ if (vetoed->second >= now) {
+ if (time(0) % 5 == 0) {
+ log() << "replSet not trying to sync from " << (*vetoed).first
+ << ", it is vetoed for " << ((*vetoed).second - now)
+ << " more seconds" << rsLog;
+ }
+ continue;
+ }
+ _syncSourceBlacklist.erase(vetoed);
+ // fall through, this is a valid candidate now
+ }
+ // This candidate has passed all tests; set 'closest'
+ closest = m;
+ }
+ if (closest) break; // no need for second attempt
+ }
+ if (!closest) {
+ return;
+ }
+ sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0);
+ _syncSource = closest;
+ }
+ void TopologyCoordinatorImpl::blacklistSyncSource(Member* member) {
+ // TODO
+ }
+ void TopologyCoordinatorImpl::registerConfigChangeCallback(Callback_t) {
+ // TODO
+ }
+ // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY
+ void TopologyCoordinatorImpl::signalDrainComplete() {
+ // TODO
+ }
+ // election entry point
+ void TopologyCoordinatorImpl::electSelf() {
+ // TODO
+ }
+ // produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command
+ bool TopologyCoordinatorImpl::prepareRequestVoteResponse(const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result) {
+ // TODO
+ return false;
+ }
+ // produce a reply to a recevied electCmd
+ void TopologyCoordinatorImpl::prepareElectCmdResponse(const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ // TODO
+ }
+ // produce a reply to a heartbeat
+ bool TopologyCoordinatorImpl::prepareHeartbeatResponse(const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result) {
+ // TODO
+ return false;
+ }
+ // update internal state with heartbeat response
+ void TopologyCoordinatorImpl::updateHeartbeatInfo(const HeartbeatInfo& newInfo) {
+ // TODO
+ }
+} // namespace replset
+} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
new file mode 100644
index 00000000000..0acd55d8ad9
--- /dev/null
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -0,0 +1,188 @@
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <string>
+#include <vector>
+#include "mongo/db/repl/topology_coordinator.h"
+#include "mongo/bson/optime.h"
+#include "mongo/db/repl/member_state.h"
+#include "mongo/util/concurrency/list.h"
+namespace mongo {
+namespace replset {
+ class TagSubgroup;
+ class TopologyCoordinatorImpl : public TopologyCoordinator {
+ public:
+ TopologyCoordinatorImpl();
+ virtual ~TopologyCoordinatorImpl() {};
+ virtual void setLastApplied(const OpTime& optime);
+ virtual void setCommitOkayThrough(const OpTime& optime);
+ virtual void setLastReceived(const OpTime& optime);
+ virtual int getSelfSlaveDelay() const;
+ virtual bool getChainingAllowedFlag() const;
+ // For use with w:majority write concern
+ virtual int getMajorityNumber() const;
+ // ReplCoord needs to know the state to implement certain public functions
+ virtual MemberState getMemberState() const;
+ // Looks up _syncSource's address and returns it, for use by the Applier
+ virtual HostAndPort getSyncSourceAddress() const;
+ // Chooses and sets a new sync source, based on our current knowledge of the world
+ virtual void chooseNewSyncSource(); // this is basically getMemberToSyncTo()
+ // Do not choose a member as a sync source for a while;
+ // call this when we have reason to believe it's a bad choice (do we need this?)
+ // (currently handled by _veto in rs_initialsync)
+ virtual void blacklistSyncSource(Member* member);
+ // Add function pointer to callback list; call function when config changes
+ // Applier needs to know when things like chainingAllowed or slaveDelay change.
+ // ReplCoord needs to know when things like the tag sets change.
+ virtual void registerConfigChangeCallback(Callback_t);
+ // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY
+ virtual void signalDrainComplete();
+ // election entry point
+ virtual void electSelf();
+ // produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command
+ virtual bool prepareRequestVoteResponse(const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result);
+ // produce a reply to a received electCmd
+ virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result);
+ // produce a reply to a heartbeat
+ virtual bool prepareHeartbeatResponse(const BSONObj& cmdObj,
+ std::string& errmsg,
+ BSONObjBuilder& result);
+ // update internal state with heartbeat response
+ virtual void updateHeartbeatInfo(const HeartbeatInfo& newInfo);
+ private:
+ void _calculateMajorityNumber(); // possibly not needed
+ OpTime _lastApplied; // the last op that the applier has actually written to the data
+ OpTime _commitOkayThrough; // the primary's latest op that won't get rolled back
+ OpTime _lastReceived; // the last op we have received from our sync source
+ MemberState _memberState;
+ int _majorityNumber; // for w:majority writes
+ // the member we currently believe is primary, if one exists
+ const Member *_currentPrimary;
+ // the member we are currently syncing from
+ // NULL if no sync source (we are primary, or we cannot connect to anyone yet)
+ const Member* _syncSource;
+ // These members are not chosen as sync sources for a period of time, due to connection
+ // issues with them
+ std::map<std::string, time_t> _syncSourceBlacklist;
+ class MemberConfig {
+ public:
+ MemberConfig() :
+ _id(-1),
+ votes(1),
+ priority(1.0),
+ arbiterOnly(false),
+ slaveDelay(0),
+ hidden(false),
+ buildIndexes(true) { }
+ int _id; /* ordinal */
+ unsigned votes; /* how many votes this node gets. default 1. */
+ HostAndPort h;
+ double priority; /* 0 means can never be primary */
+ bool arbiterOnly;
+ int slaveDelay; /* seconds. */
+ bool hidden; /* if set, don't advertise to drivers in isMaster. */
+ /* for non-primaries (priority 0) */
+ bool buildIndexes; /* if false, do not create any non-_id indexes */
+ std::map<std::string,std::string> tags; /* tagging for data center, rack, etc. */
+ private:
+ std::set<TagSubgroup*> _groups; // the subgroups this member belongs to
+ };
+ struct ReplicaSetConfig {
+ std::vector<MemberConfig> members;
+ std::string replSetName;
+ int version;
+ MemberConfig* self;
+ /**
+ * If replication can be chained. If chaining is disallowed, it can still be explicitly
+ * enabled via the replSetSyncFrom command, but it will not happen automatically.
+ */
+ bool chainingAllowed;
+ } _currentConfig;
+ Member* _self;
+ List1<Member> _otherMembers; // all members of the set EXCEPT _self.
+ // do these need settors? the current code has no way to change these values.
+ struct HeartbeatOptions {
+ HeartbeatOptions() : heartbeatSleepMillis(2000),
+ heartbeatTimeoutMillis(10000),
+ heartbeatConnRetries(2)
+ { }
+ unsigned heartbeatSleepMillis;
+ unsigned heartbeatTimeoutMillis;
+ unsigned heartbeatConnRetries ;
+ void check() {
+ uassert(17490, "bad replset heartbeat option", heartbeatSleepMillis >= 10);
+ uassert(17491, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10);
+ }
+ bool operator==(const HeartbeatOptions& r) const {
+ return (heartbeatSleepMillis==r.heartbeatSleepMillis &&
+ heartbeatTimeoutMillis==r.heartbeatTimeoutMillis &&
+ heartbeatConnRetries==r.heartbeatConnRetries);
+ }
+ } _heartbeatOptions;
+ };
+} // namespace replset
+} // namespace mongo