/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,b
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/repl_set_health_poll_task.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/db/repl/connections.h"
#include "mongo/db/repl/heartbeat.h"
#include "mongo/db/repl/manager.h"
#include "mongo/db/repl/member.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_config.h"
namespace mongo {
int ReplSetHealthPollTask::s_try_offset = 0;
ReplSetHealthPollTask::ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
: h(hh), m(mm), tries(s_try_offset), threshold(15),
_timeout(ReplSetConfig::DEFAULT_HB_TIMEOUT) {
if (theReplSet) {
_timeout = theReplSet->config().getHeartbeatTimeout();
}
// doesn't need protection, all health tasks are created in a single thread
s_try_offset += 7;
}
void ReplSetHealthPollTask::doWork() {
if ( !theReplSet ) {
LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog;
return;
}
HeartbeatInfo mem = m;
HeartbeatInfo old = mem;
try {
BSONObj info;
int theirConfigVersion = -10000;
bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
// weight new ping with old pings
// on the first ping, just use the ping value
if (old.ping != 0) {
mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
}
if( ok ) {
up(info, mem);
}
else if (info["code"].numberInt() == ErrorCodes::Unauthorized ||
info["errmsg"].str() == "unauthorized") {
authIssue(mem);
}
else {
down(mem, info.getStringField("errmsg"));
}
}
catch (const DBException& e) {
log() << "replSet health poll task caught a DBException: " << e.what();
down(mem, e.what());
}
catch (const std::exception& e) {
log() << "replSet health poll task caught an exception: " << e.what();
down(mem, e.what());
}
m = mem;
theReplSet->mgr->send( stdx::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
static time_t last = 0;
time_t now = time(0);
bool changed = mem.changed(old);
if( changed ) {
if( old.hbstate != mem.hbstate )
log() << "replSet member " << h.toString() << " is now in state "
<< mem.hbstate.toString() << rsLog;
}
if( changed || now-last>4 ) {
last = now;
theReplSet->mgr->send( stdx::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
}
}
bool ReplSetHealthPollTask::tryHeartbeat(BSONObj* info, int* theirConfigVersion) {
bool ok = false;
try {
ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
h.toString(), *info, theReplSet->config().version,
*theirConfigVersion);
}
catch (DBException&) {
// don't do anything, ok is already false
}
return ok;
}
bool ReplSetHealthPollTask::_requestHeartbeat(HeartbeatInfo& mem,
BSONObj& info,
int& theirConfigVersion) {
{
ScopedConn conn(h.toString());
conn.setTimeout(_timeout);
if (tries++ % threshold == (threshold - 1)) {
conn.reconnect();
}
}
Timer timer;
time_t before = curTimeMicros64() / 1000000;
bool ok = tryHeartbeat(&info, &theirConfigVersion);
mem.ping = static_cast(timer.millis());
time_t totalSecs = mem.ping / 1000;
// if that didn't work and we have more time, lower timeout and try again
if (!ok && totalSecs < _timeout) {
log() << "replset info " << h.toString() << " heartbeat failed, retrying" << rsLog;
// lower timeout to remaining ping time
{
ScopedConn conn(h.toString());
conn.setTimeout(_timeout - totalSecs);
}
int checkpoint = timer.millis();
timer.reset();
ok = tryHeartbeat(&info, &theirConfigVersion);
mem.ping = static_cast(timer.millis());
totalSecs = (checkpoint + mem.ping)/1000;
// set timeout back to default
{
ScopedConn conn(h.toString());
conn.setTimeout(_timeout);
}
}
// we set this on any response - we don't get this far if
// couldn't connect because exception is thrown
time_t after = mem.lastHeartbeat = before + totalSecs;
if ( info["time"].isNumber() ) {
long long t = info["time"].numberLong();
if( t > after )
mem.skew = (int) (t - after);
else if( t < before )
mem.skew = (int) (t - before); // negative
}
else {
// it won't be there if remote hasn't initialized yet
if( info.hasElement("time") )
warning() << "heatbeat.time isn't a number: " << info << endl;
mem.skew = INT_MIN;
}
{
BSONElement state = info["state"];
if( state.ok() )
mem.hbstate = MemberState(state.Int());
}
if (info.hasField("stateDisagreement") && info["stateDisagreement"].trueValue()) {
log() << "replset info " << h.toString() << " thinks that we are down" << endl;
}
return ok;
}
void ReplSetHealthPollTask::authIssue(HeartbeatInfo& mem) {
mem.authIssue = true;
mem.hbstate = MemberState::RS_UNKNOWN;
// set health to 0 so that this doesn't count towards majority
mem.health = 0.0;
theReplSet->rmFromElectable(mem.id());
}
void ReplSetHealthPollTask::down(HeartbeatInfo& mem, string msg) {
// if we've received a heartbeat from this member within the last two seconds, don't
// change its state to down (if it's already down, leave it down since we don't have
// any info about it other than it's heartbeating us)
const Member* oldMemInfo = theReplSet->findById(mem.id());
if (oldMemInfo && oldMemInfo->hbinfo().lastHeartbeatRecv+2 >= time(0)) {
log() << "replset info " << h.toString()
<< " just heartbeated us, but our heartbeat failed: " << msg
<< ", not changing state" << rsLog;
// we don't update any of the heartbeat info, though, since we didn't get any info
// other than "not down" from having it heartbeat us
return;
}
mem.authIssue = false;
mem.health = 0.0;
mem.ping = 0;
if( mem.upSince || mem.downSince == 0 ) {
mem.upSince = 0;
mem.downSince = jsTime();
mem.hbstate = MemberState::RS_DOWN;
log() << "replSet info " << h.toString() << " is down (or slow to respond): "
<< msg << rsLog;
}
mem.lastHeartbeatMsg = msg;
theReplSet->rmFromElectable(mem.id());
}
void ReplSetHealthPollTask::up(const BSONObj& info, HeartbeatInfo& mem) {
HeartbeatInfo::numPings++;
mem.authIssue = false;
if( mem.upSince == 0 ) {
log() << "replSet member " << h.toString() << " is up" << rsLog;
mem.upSince = mem.lastHeartbeat;
}
mem.health = 1.0;
mem.lastHeartbeatMsg = info["hbmsg"].String();
if (info.hasElement("syncingTo")) {
mem.syncingTo = info["syncingTo"].String();
}
else {
// empty out syncingTo since they are no longer syncing to anyone
mem.syncingTo = "";
}
if( info.hasElement("opTime") )
mem.opTime = info["opTime"].Date();
// see if this member is in the electable set
if( info["e"].eoo() ) {
// for backwards compatibility
const Member *member = theReplSet->findById(mem.id());
if (member && member->config().potentiallyHot()) {
theReplSet->addToElectable(mem.id());
}
else {
theReplSet->rmFromElectable(mem.id());
}
}
// add this server to the electable set if it is within 10
// seconds of the latest optime we know of
else if( info["e"].trueValue() &&
mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
if (lastOp > 0 && mem.opTime >= lastOp - 10) {
theReplSet->addToElectable(mem.id());
}
}
else {
theReplSet->rmFromElectable(mem.id());
}
BSONElement cfg = info["config"];
if( cfg.ok() ) {
// received a new config
stdx::function f =
stdx::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
theReplSet->mgr->send(f);
}
if (info.hasElement("electionTime")) {
LOG(4) << "setting electionTime to " << info["electionTime"];
mem.electionTime = info["electionTime"].Date();
}
}
} // namespace mongo