summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <stewart@mysql.com>2005-07-22 20:29:25 +1000
committerunknown <stewart@mysql.com>2005-07-22 20:29:25 +1000
commitf4b560008328bdecd875da85a2ff882a185e8d94 (patch)
tree2e76db089b0c90e71a5f914ccc90b309cee75ee1 /ndb
parentef45f4b688b29d61cf7449dcfced91aa1dc6217b (diff)
downloadmariadb-git-f4b560008328bdecd875da85a2ff882a185e8d94.tar.gz
WL#2347 - Load independent heartbeats
Reset missed heartbeat count on receipt of signal from node. This fixes a bug where that under high network load, the heartbeat packets could be delayed, causing the appearance of node failure (due to lost heartbeats). ndb/include/kernel/NodeInfo.hpp: Add m_heartbeat_cnt to track missed heartbeats ndb/include/transporter/TransporterCallback.hpp: add prototype for transporter_recv_from() Called on receipt from a node. ndb/src/common/transporter/TransporterRegistry.cpp: Add calls to transporter_receive_from when data is received (before unpack) ndb/src/kernel/blocks/qmgr/Qmgr.hpp: remove NodeRec::alarmCount. missed heartbeat count now kept in NodeInfo ndb/src/kernel/blocks/qmgr/QmgrMain.cpp: Use NodeInfo::m_heartbeat_cnt for missed heartbeat count ndb/src/kernel/vm/TransporterCallback.cpp: add transporter_recv_from(), which is called on receipt of signals. It resets missed heartbeat count for that node. ndb/src/ndbapi/ClusterMgr.cpp: Use NodeInfo::m_heartbeat_cnt for missed heartbeat count ndb/src/ndbapi/ClusterMgr.hpp: Use NodeInfo::m_heartbeat_cnt instead of ClusterMgr::Node::hbSent for missed heartbeat count. We now use the same storage for API and Kernel heartbeats. Add ClusterMgr::hb_received(nodeId) to reset hbSent (as if we received a heartbeat, but callable from elsewhere - e.g. when signal received) ndb/src/ndbapi/TransporterFacade.cpp: Implement transporter_recv_from for ndbapi - which resets hbSent ndb/src/ndbapi/TransporterFacade.hpp: Add hb_received(nodeId)
Diffstat (limited to 'ndb')
-rw-r--r--ndb/include/kernel/NodeInfo.hpp2
-rw-r--r--ndb/include/transporter/TransporterCallback.hpp3
-rw-r--r--ndb/src/common/transporter/TransporterRegistry.cpp4
-rw-r--r--ndb/src/kernel/blocks/qmgr/Qmgr.hpp3
-rw-r--r--ndb/src/kernel/blocks/qmgr/QmgrMain.cpp53
-rw-r--r--ndb/src/kernel/vm/TransporterCallback.cpp6
-rw-r--r--ndb/src/ndbapi/ClusterMgr.cpp10
-rw-r--r--ndb/src/ndbapi/ClusterMgr.hpp10
-rw-r--r--ndb/src/ndbapi/TransporterFacade.cpp4
-rw-r--r--ndb/src/ndbapi/TransporterFacade.hpp9
10 files changed, 69 insertions, 35 deletions
diff --git a/ndb/include/kernel/NodeInfo.hpp b/ndb/include/kernel/NodeInfo.hpp
index 5377f001949..622185323a3 100644
--- a/ndb/include/kernel/NodeInfo.hpp
+++ b/ndb/include/kernel/NodeInfo.hpp
@@ -41,6 +41,7 @@ public:
Uint32 m_type; ///< Node type
Uint32 m_connectCount; ///< No of times connected
bool m_connected; ///< Node is connected
+ Uint32 m_heartbeat_cnt; ///< Missed heartbeats
friend NdbOut & operator<<(NdbOut&, const NodeInfo&);
};
@@ -52,6 +53,7 @@ NodeInfo::NodeInfo(){
m_signalVersion = 0;
m_type = INVALID;
m_connectCount = 0;
+ m_heartbeat_cnt= 0;
}
inline
diff --git a/ndb/include/transporter/TransporterCallback.hpp b/ndb/include/transporter/TransporterCallback.hpp
index 9f910f31728..f2432edd394 100644
--- a/ndb/include/transporter/TransporterCallback.hpp
+++ b/ndb/include/transporter/TransporterCallback.hpp
@@ -341,5 +341,8 @@ enum TransporterError {
*/
void
reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode);
+
+void
+transporter_recv_from(void* callbackObj, NodeId node);
#endif
diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp
index 60649665d4a..86bfa385c04 100644
--- a/ndb/src/common/transporter/TransporterRegistry.cpp
+++ b/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -918,6 +918,7 @@ TransporterRegistry::performReceive()
NodeId remoteNodeId;
Uint32 * readPtr;
Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
+ transporter_recv_from(callbackObj, remoteNodeId);
Uint32 szUsed = unpack(readPtr,
sz,
remoteNodeId,
@@ -953,6 +954,7 @@ TransporterRegistry::performReceive()
{
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
+ transporter_recv_from(callbackObj, nodeId);
Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
t->updateReceiveDataPtr(szUsed);
}
@@ -976,6 +978,7 @@ TransporterRegistry::performReceive()
{
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
+ transporter_recv_from(callbackObj, nodeId);
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(newPtr);
}
@@ -993,6 +996,7 @@ TransporterRegistry::performReceive()
{
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
+ transporter_recv_from(callbackObj, nodeId);
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(newPtr);
}
diff --git a/ndb/src/kernel/blocks/qmgr/Qmgr.hpp b/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
index e134609df0a..2f4fcd21460 100644
--- a/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
+++ b/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
@@ -118,8 +118,7 @@ public:
struct NodeRec {
UintR ndynamicId;
Phase phase;
- UintR alarmCount;
-
+
QmgrState sendPrepFailReqStatus;
QmgrState sendCommitFailReqStatus;
QmgrState sendPresToStatus;
diff --git a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
index 621ec70fbe1..d062f5afb7e 100644
--- a/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
+++ b/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
@@ -66,7 +66,7 @@ void Qmgr::execCM_HEARTBEAT(Signal* signal)
jamEntry();
hbNodePtr.i = signal->theData[0];
ptrCheckGuard(hbNodePtr, MAX_NDB_NODES, nodeRec);
- hbNodePtr.p->alarmCount = 0;
+ setNodeInfo(hbNodePtr.i).m_heartbeat_cnt= 0;
return;
}//Qmgr::execCM_HEARTBEAT()
@@ -1040,7 +1040,7 @@ void Qmgr::execCM_ADD(Signal* signal)
jam();
ndbrequire(addNodePtr.p->phase == ZSTARTING);
addNodePtr.p->phase = ZRUNNING;
- addNodePtr.p->alarmCount = 0;
+ setNodeInfo(addNodePtr.i).m_heartbeat_cnt= 0;
c_clusterNodes.set(addNodePtr.i);
findNeighbours(signal);
@@ -1078,7 +1078,7 @@ Qmgr::joinedCluster(Signal* signal, NodeRecPtr nodePtr){
* NODES IN THE CLUSTER.
*/
nodePtr.p->phase = ZRUNNING;
- nodePtr.p->alarmCount = 0;
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
findNeighbours(signal);
c_clusterNodes.set(nodePtr.i);
c_start.reset();
@@ -1299,7 +1299,7 @@ void Qmgr::findNeighbours(Signal* signal)
*---------------------------------------------------------------------*/
fnNodePtr.i = cneighbourl;
ptrCheckGuard(fnNodePtr, MAX_NDB_NODES, nodeRec);
- fnNodePtr.p->alarmCount = 0;
+ setNodeInfo(fnNodePtr.i).m_heartbeat_cnt= 0;
}//if
}//if
@@ -1347,8 +1347,8 @@ void Qmgr::initData(Signal* signal)
} else {
nodePtr.p->phase = ZAPI_INACTIVE;
}
-
- nodePtr.p->alarmCount = 0;
+
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
nodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;
nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
@@ -1550,18 +1550,18 @@ void Qmgr::checkHeartbeat(Signal* signal)
}//if
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
- nodePtr.p->alarmCount ++;
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt++;
ndbrequire(nodePtr.p->phase == ZRUNNING);
ndbrequire(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB);
- if(nodePtr.p->alarmCount > 2){
+ if(getNodeInfo(nodePtr.i).m_heartbeat_cnt > 2){
signal->theData[0] = NDB_LE_MissedHeartbeat;
signal->theData[1] = nodePtr.i;
- signal->theData[2] = nodePtr.p->alarmCount - 1;
+ signal->theData[2] = getNodeInfo(nodePtr.i).m_heartbeat_cnt - 1;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
}
- if (nodePtr.p->alarmCount > 4) {
+ if (getNodeInfo(nodePtr.i).m_heartbeat_cnt > 4) {
jam();
/**----------------------------------------------------------------------
* OUR LEFT NEIGHBOUR HAVE KEPT QUIET FOR THREE CONSECUTIVE HEARTBEAT
@@ -1593,16 +1593,16 @@ void Qmgr::apiHbHandlingLab(Signal* signal)
if (TnodePtr.p->phase == ZAPI_ACTIVE){
jam();
- TnodePtr.p->alarmCount ++;
+ setNodeInfo(TnodePtr.i).m_heartbeat_cnt++;
- if(TnodePtr.p->alarmCount > 2){
+ if(getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 2){
signal->theData[0] = NDB_LE_MissedHeartbeat;
signal->theData[1] = nodeId;
- signal->theData[2] = TnodePtr.p->alarmCount - 1;
+ signal->theData[2] = getNodeInfo(TnodePtr.i).m_heartbeat_cnt - 1;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
}
- if (TnodePtr.p->alarmCount > 4) {
+ if (getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 4) {
jam();
/*------------------------------------------------------------------*/
/* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS.
@@ -1634,16 +1634,17 @@ void Qmgr::checkStartInterface(Signal* signal)
ptrAss(nodePtr, nodeRec);
if (nodePtr.p->phase == ZFAIL_CLOSING) {
jam();
- nodePtr.p->alarmCount = nodePtr.p->alarmCount + 1;
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt++;
if (c_connectedNodes.get(nodePtr.i)){
jam();
/*-------------------------------------------------------------------*/
// We need to ensure that the connection is not restored until it has
// been disconnected for at least three seconds.
/*-------------------------------------------------------------------*/
- nodePtr.p->alarmCount = 0;
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
}//if
- if ((nodePtr.p->alarmCount > 3) && (nodePtr.p->failState == NORMAL)) {
+ if ((getNodeInfo(nodePtr.i).m_heartbeat_cnt > 3)
+ && (nodePtr.p->failState == NORMAL)) {
/**------------------------------------------------------------------
* WE HAVE DISCONNECTED THREE SECONDS AGO. WE ARE NOW READY TO
* CONNECT AGAIN AND ACCEPT NEW REGISTRATIONS FROM THIS NODE.
@@ -1659,18 +1660,18 @@ void Qmgr::checkStartInterface(Signal* signal)
nodePtr.p->phase = ZINIT;
}//if
- nodePtr.p->alarmCount = 0;
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
signal->theData[0] = 0;
signal->theData[1] = nodePtr.i;
sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA);
} else {
- if(((nodePtr.p->alarmCount + 1) % 60) == 0){
+ if(((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 60) == 0){
char buf[100];
BaseString::snprintf(buf, sizeof(buf),
"Failure handling of node %d has not completed in %d min."
" - state = %d",
nodePtr.i,
- (nodePtr.p->alarmCount + 1)/60,
+ (getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1)/60,
nodePtr.p->failState);
warningEvent(buf);
}
@@ -1718,7 +1719,7 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo)
* WE ONLY NEED TO SET PARAMETERS TO ENABLE A NEW CONNECTION IN A FEW
* SECONDS.
*-------------------------------------------------------------------------*/
- failedNodePtr.p->alarmCount = 0;
+ setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0];
@@ -1871,7 +1872,7 @@ void Qmgr::node_failed(Signal* signal, Uint16 aFailedNode)
/*---------------------------------------------------------------------*/
failedNodePtr.p->failState = NORMAL;
failedNodePtr.p->phase = ZFAIL_CLOSING;
- failedNodePtr.p->alarmCount = 0;
+ setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
CloseComReqConf * const closeCom =
(CloseComReqConf *)&signal->theData[0];
@@ -1965,8 +1966,8 @@ void Qmgr::execAPI_REGREQ(Signal* signal)
}
setNodeInfo(apiNodePtr.i).m_version = version;
-
- apiNodePtr.p->alarmCount = 0;
+
+ setNodeInfo(apiNodePtr.i).m_heartbeat_cnt= 0;
ApiRegConf * const apiRegConf = (ApiRegConf *)&signal->theData[0];
apiRegConf->qmgrRef = reference();
@@ -2484,7 +2485,7 @@ void Qmgr::execCOMMIT_FAILREQ(Signal* signal)
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
nodePtr.p->phase = ZFAIL_CLOSING;
nodePtr.p->failState = WAITING_FOR_NDB_FAILCONF;
- nodePtr.p->alarmCount = 0;
+ setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
c_clusterNodes.clear(nodePtr.i);
}//for
/*----------------------------------------------------------------------*/
@@ -2742,7 +2743,7 @@ void Qmgr::failReport(Signal* signal,
failedNodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;
failedNodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
failedNodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
- failedNodePtr.p->alarmCount = 0;
+ setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
if (aSendFailRep == ZTRUE) {
jam();
if (failedNodePtr.i != getOwnNodeId()) {
diff --git a/ndb/src/kernel/vm/TransporterCallback.cpp b/ndb/src/kernel/vm/TransporterCallback.cpp
index 0f292143c21..e5322edaecc 100644
--- a/ndb/src/kernel/vm/TransporterCallback.cpp
+++ b/ndb/src/kernel/vm/TransporterCallback.cpp
@@ -33,6 +33,7 @@
#include <NdbOut.hpp>
#include "DataBuffer.hpp"
+
/**
* The instance
*/
@@ -452,3 +453,8 @@ SignalLoggerManager::printSegmentedSection(FILE * output,
putc('\n', output);
}
+void
+transporter_recv_from(void * callbackObj, NodeId nodeId){
+ globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0;
+ return;
+}
diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp
index ef9367ef10e..42b7c5b115d 100644
--- a/ndb/src/ndbapi/ClusterMgr.cpp
+++ b/ndb/src/ndbapi/ClusterMgr.cpp
@@ -214,7 +214,7 @@ ClusterMgr::threadMain( ){
* It is now time to send a new Heartbeat
*/
if (theNode.hbCounter >= theNode.hbFrequency) {
- theNode.hbSent++;
+ theNode.m_info.m_heartbeat_cnt++;
theNode.hbCounter = 0;
}
@@ -231,7 +231,7 @@ ClusterMgr::threadMain( ){
theFacade.sendSignalUnCond(&signal, nodeId);
}//if
- if (theNode.hbSent == 4 && theNode.hbFrequency > 0){
+ if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){
reportNodeFailed(i);
}//if
}
@@ -337,7 +337,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
node.m_info.m_version);
}
-
+
node.m_state = apiRegConf->nodeState;
if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED ||
node.m_state.startLevel == NodeState::SL_SINGLEUSER)){
@@ -345,7 +345,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
} else {
set_node_alive(node, false);
}//if
- node.hbSent = 0;
+ node.m_info.m_heartbeat_cnt = 0;
node.hbCounter = 0;
if (node.m_info.m_type != NodeInfo::REP) {
node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
@@ -414,7 +414,7 @@ ClusterMgr::reportConnected(NodeId nodeId){
Node & theNode = theNodes[nodeId];
theNode.connected = true;
- theNode.hbSent = 0;
+ theNode.m_info.m_heartbeat_cnt = 0;
theNode.hbCounter = 0;
/**
diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp
index d75b820e9cb..da8f16d6789 100644
--- a/ndb/src/ndbapi/ClusterMgr.hpp
+++ b/ndb/src/ndbapi/ClusterMgr.hpp
@@ -73,12 +73,12 @@ public:
*/
Uint32 hbFrequency; // Heartbeat frequence
Uint32 hbCounter; // # milliseconds passed since last hb sent
- Uint32 hbSent; // # heartbeats sent (without answer)
};
const Node & getNodeInfo(NodeId) const;
Uint32 getNoOfConnectedNodes() const;
-
+ void hb_received(NodeId);
+
private:
Uint32 noOfAliveNodes;
Uint32 noOfConnectedNodes;
@@ -128,6 +128,12 @@ ClusterMgr::getNoOfConnectedNodes() const {
return noOfConnectedNodes;
}
+inline
+void
+ClusterMgr::hb_received(NodeId nodeId) {
+ theNodes[nodeId].m_info.m_heartbeat_cnt= 0;
+}
+
/*****************************************************************************/
/**
diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp
index 96f376db5a5..b143f1a9944 100644
--- a/ndb/src/ndbapi/TransporterFacade.cpp
+++ b/ndb/src/ndbapi/TransporterFacade.cpp
@@ -126,6 +126,10 @@ reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
//TransporterFacade::instance()->reportDisconnected(nodeId);
}
+void
+transporter_recv_from(void * callbackObj, NodeId nodeId){
+ ((TransporterFacade*)(callbackObj))->hb_received(nodeId);
+}
/****************************************************************************
*
diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp
index e74f4b51e00..fa070889dd9 100644
--- a/ndb/src/ndbapi/TransporterFacade.hpp
+++ b/ndb/src/ndbapi/TransporterFacade.hpp
@@ -114,6 +114,9 @@ public:
TransporterRegistry* get_registry() { return theTransporterRegistry;};
+ // heart beat received from a node (e.g. a signal came)
+ void hb_received(NodeId n);
+
private:
/**
* Send a signal unconditional of node status (used by ClusterMgr)
@@ -296,6 +299,12 @@ TransporterFacade::get_node_alive(NodeId n) const {
}
inline
+void
+TransporterFacade::hb_received(NodeId n) {
+ theClusterMgr->hb_received(n);
+}
+
+inline
bool
TransporterFacade::get_node_stopping(NodeId n) const {
const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);