summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/heartbeat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/heartbeat.cpp')
-rw-r--r--src/mongo/db/repl/heartbeat.cpp382
1 files changed, 382 insertions, 0 deletions
diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp
new file mode 100644
index 00000000000..331812af85a
--- /dev/null
+++ b/src/mongo/db/repl/heartbeat.cpp
@@ -0,0 +1,382 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "health.h"
+#include "../../util/background.h"
+#include "../../client/dbclient.h"
+#include "../commands.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/task.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/mongoutils/html.h"
+#include "../../util/goodies.h"
+#include "../../util/ramlog.h"
+#include "../helpers/dblogger.h"
+#include "connections.h"
+#include "../../util/unittest.h"
+#include "../instance.h"
+#include "../repl.h"
+
+namespace mongo {
+
+ using namespace bson;
+
+ extern bool replSetBlind;
+ extern ReplSettings replSettings;
+
+ unsigned int HeartbeatInfo::numPings;
+
+ long long HeartbeatInfo::timeDown() const {
+ if( up() ) return 0;
+ if( downSince == 0 )
+ return 0; // still waiting on first heartbeat
+ return jsTime() - downSince;
+ }
+
+ /* { replSetHeartbeat : <setname> } */
+ class CmdReplSetHeartbeat : public ReplSetCommand {
+ public:
+ CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( replSetBlind ) {
+ if (theReplSet) {
+ errmsg = str::stream() << theReplSet->selfFullName() << " is blind";
+ }
+ return false;
+ }
+
+ /* we don't call ReplSetCommand::check() here because heartbeat
+ checks many things that are pre-initialization. */
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ return false;
+ }
+
+ if (!checkAuth(errmsg, result)) {
+ return false;
+ }
+
+ /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */
+ {
+ AbstractMessagingPort *mp = cc().port();
+ if( mp )
+ mp->tag |= 1;
+ }
+
+ if( cmdObj["pv"].Int() != 1 ) {
+ errmsg = "incompatible replset protocol version";
+ return false;
+ }
+ {
+ string s = string(cmdObj.getStringField("replSetHeartbeat"));
+ if( cmdLine.ourSetName() != s ) {
+ errmsg = "repl set names do not match";
+ log() << "replSet set names do not match, our cmdline: " << cmdLine._replSet << rsLog;
+ log() << "replSet s: " << s << rsLog;
+ result.append("mismatch", true);
+ return false;
+ }
+ }
+
+ result.append("rs", true);
+ if( cmdObj["checkEmpty"].trueValue() ) {
+ result.append("hasData", replHasDatabases());
+ }
+ if( theReplSet == 0 ) {
+ string from( cmdObj.getStringField("from") );
+ if( !from.empty() ) {
+ scoped_lock lck( replSettings.discoveredSeeds_mx );
+ replSettings.discoveredSeeds.insert(from);
+ }
+ result.append("hbmsg", "still initializing");
+ return true;
+ }
+
+ if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
+ errmsg = "repl set names do not match (2)";
+ result.append("mismatch", true);
+ return false;
+ }
+ result.append("set", theReplSet->name());
+ result.append("state", theReplSet->state().s);
+ result.append("e", theReplSet->iAmElectable());
+ result.append("hbmsg", theReplSet->hbmsg());
+ result.append("time", (long long) time(0));
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ int v = theReplSet->config().version;
+ result.append("v", v);
+ if( v > cmdObj["v"].Int() )
+ result << "config" << theReplSet->config().asBson();
+
+ return true;
+ }
+ } cmdReplSetHeartbeat;
+
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result,
+ int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ if( replSetBlind ) {
+ return false;
+ }
+
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
+ "v" << myCfgVersion <<
+ "pv" << 1 <<
+ "checkEmpty" << checkEmpty <<
+ "from" << from );
+
+ // generally not a great idea to do outbound waiting calls in a
+ // write lock. heartbeats can be slow (multisecond to respond), so
+ // generally we don't want to be locked, at least not without
+ // thinking acarefully about it first.
+ uassert(15900, "can't heartbeat: too much lock",
+ !d.dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
+
+ ScopedConn conn(memberFullName);
+ return conn.runCommand("admin", cmd, result, 0);
+ }
+
+ /**
+ * Poll every other set member to check its status.
+ *
+ * A detail about local machines and authentication: suppose we have 2
+ * members, A and B, on the same machine using different keyFiles. A is
+ * primary. If we're just starting the set, there are no admin users, so A
+ * and B can access each other because it's local access.
+ *
+ * Then we add a user to A. B cannot sync this user from A, because as soon
+ * as we add a an admin user, A requires auth. However, A can still
+ * heartbeat B, because B *doesn't* have an admin user. So A can reach B
+ * but B cannot reach A.
+ *
+ * Once B is restarted with the correct keyFile, everything should work as
+ * expected.
+ */
+ class ReplSetHealthPollTask : public task::Task {
+ private:
+ HostAndPort h;
+ HeartbeatInfo m;
+ int tries;
+ const int threshold;
+ public:
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
+ : h(hh), m(mm), tries(0), threshold(15) { }
+
+ string name() const { return "rsHealthPoll"; }
+ void doWork() {
+ if ( !theReplSet ) {
+ LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog;
+ return;
+ }
+
+ HeartbeatInfo mem = m;
+ HeartbeatInfo old = mem;
+ try {
+ BSONObj info;
+ int theirConfigVersion = -10000;
+
+ bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
+
+ // weight new ping with old pings
+ // on the first ping, just use the ping value
+ if (old.ping != 0) {
+ mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
+ }
+
+ if( ok ) {
+ up(info, mem);
+ }
+ else if (!info["errmsg"].eoo() &&
+ info["errmsg"].str() == "need to login") {
+ authIssue(mem);
+ }
+ else {
+ down(mem, info.getStringField("errmsg"));
+ }
+ }
+ catch(DBException& e) {
+ down(mem, e.what());
+ }
+ catch(...) {
+ down(mem, "replSet unexpected exception in ReplSetHealthPollTask");
+ }
+ m = mem;
+
+ theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
+
+ static time_t last = 0;
+ time_t now = time(0);
+ bool changed = mem.changed(old);
+ if( changed ) {
+ if( old.hbstate != mem.hbstate )
+ log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog;
+ }
+ if( changed || now-last>4 ) {
+ last = now;
+ theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+
+ private:
+ bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
+ if (tries++ % threshold == (threshold - 1)) {
+ ScopedConn conn(h.toString());
+ conn.reconnect();
+ }
+
+ Timer timer;
+ time_t before = curTimeMicros64() / 1000000;
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
+ h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ mem.ping = (unsigned int)timer.millis();
+
+ // we set this on any response - we don't get this far if
+ // couldn't connect because exception is thrown
+ time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+
+ if ( info["time"].isNumber() ) {
+ long long t = info["time"].numberLong();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ else {
+ // it won't be there if remote hasn't initialized yet
+ if( info.hasElement("time") )
+ warning() << "heatbeat.time isn't a number: " << info << endl;
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+
+ return ok;
+ }
+
+ void authIssue(HeartbeatInfo& mem) {
+ mem.authIssue = true;
+ mem.hbstate = MemberState::RS_UNKNOWN;
+
+ // set health to 0 so that this doesn't count towards majority
+ mem.health = 0.0;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ void down(HeartbeatInfo& mem, string msg) {
+ mem.authIssue = false;
+ mem.health = 0.0;
+ mem.ping = 0;
+ if( mem.upSince || mem.downSince == 0 ) {
+ mem.upSince = 0;
+ mem.downSince = jsTime();
+ mem.hbstate = MemberState::RS_DOWN;
+ log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
+ }
+ mem.lastHeartbeatMsg = msg;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ void up(const BSONObj& info, HeartbeatInfo& mem) {
+ HeartbeatInfo::numPings++;
+ mem.authIssue = false;
+
+ if( mem.upSince == 0 ) {
+ log() << "replSet member " << h.toString() << " is up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ // see if this member is in the electable set
+ if( info["e"].eoo() ) {
+ // for backwards compatibility
+ const Member *member = theReplSet->findById(mem.id());
+ if (member && member->config().potentiallyHot()) {
+ theReplSet->addToElectable(mem.id());
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+ }
+ // add this server to the electable set if it is within 10
+ // seconds of the latest optime we know of
+ else if( info["e"].trueValue() &&
+ mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
+ unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
+ if (lastOp > 0 && mem.opTime >= lastOp - 10) {
+ theReplSet->addToElectable(mem.id());
+ }
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
+ };
+
+ void ReplSetImpl::endOldHealthTasks() {
+ unsigned sz = healthTasks.size();
+ for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ )
+ (*i)->halt();
+ healthTasks.clear();
+ if( sz )
+ DEV log() << "replSet debug: cleared old tasks " << sz << endl;
+ }
+
+ void ReplSetImpl::startHealthTaskFor(Member *m) {
+ ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
+ healthTasks.insert(task);
+ task::repeat(task, 2000);
+ }
+
+ void startSyncThread();
+
+ /** called during repl set startup. caller expects it to return fairly quickly.
+ note ReplSet object is only created once we get a config - so this won't run
+ until the initiation.
+ */
+ void ReplSetImpl::startThreads() {
+ task::fork(mgr);
+ mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+
+ boost::thread t(startSyncThread);
+
+ task::fork(ghost);
+
+ // member heartbeats are started in ReplSetImpl::initFromConfig
+ }
+
+}
+
+/* todo:
+ stop bg job and delete on removefromset
+*/