/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include "TransporterFacade.hpp" #include "ClusterMgr.hpp" #include #include "NdbApiSignal.hpp" #include "API.hpp" #include #include #include #include #include #include #include #include #include int global_flag_send_heartbeat_now= 0; // Just a C wrapper for threadMain extern "C" void* runClusterMgr_C(void * me) { ((ClusterMgr*) me)->threadMain(); /** * Sleep to allow another thread that is not exiting to take control * of signals allocated by this thread * * see Ndb::~Ndb() in Ndbinit.cpp */ #ifdef NDB_OSE NdbSleep_MilliSleep(50); #endif NdbThread_Exit(0); return NULL; } extern "C" { void ndbSetOwnVersion(); } ClusterMgr::ClusterMgr(TransporterFacade & _facade): theStop(0), theFacade(_facade) { ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); noOfConnectedNodes= 0; theClusterMgrThread= 0; } ClusterMgr::~ClusterMgr(){ doStop(); NdbMutex_Destroy(clusterMgrThreadMutex); } void ClusterMgr::init(ndb_mgm_configuration_iterator & iter){ for(iter.first(); iter.valid(); iter.next()){ Uint32 tmp = 0; if(iter.get(CFG_NODE_ID, &tmp)) continue; theNodes[tmp].defined = true; #if 0 ndbout << "--------------------------------------" << endl; ndbout << "--------------------------------------" << endl; ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp)); #endif unsigned type; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; switch(type){ case NODE_TYPE_DB: theNodes[tmp].m_info.m_type = NodeInfo::DB; break; case NODE_TYPE_API: theNodes[tmp].m_info.m_type = NodeInfo::API; break; case NODE_TYPE_MGM: theNodes[tmp].m_info.m_type = NodeInfo::MGM; break; case NODE_TYPE_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; break; case NODE_TYPE_EXT_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; { Uint32 hbFreq = 10000; //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq); theNodes[tmp].hbFrequency = hbFreq; assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000); } break; default: type = type; #if 0 ndbout_c("ClusterMgr: Unknown node type: %d", type); #endif } } } void ClusterMgr::startThread() { NdbMutex_Lock(clusterMgrThreadMutex); theStop = 0; theClusterMgrThread = NdbThread_Create(runClusterMgr_C, (void**)this, 32768, "ndb_clustermgr", NDB_THREAD_PRIO_LOW); NdbMutex_Unlock(clusterMgrThreadMutex); } void ClusterMgr::doStop( ){ DBUG_ENTER("ClusterMgr::doStop"); NdbMutex_Lock(clusterMgrThreadMutex); if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void *status; theStop = 1; if (theClusterMgrThread) { NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_Destroy(&theClusterMgrThread); theClusterMgrThread= 0; } NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; Uint32 timeSlept = 100; Uint64 now = NdbTick_CurrentMillisecond(); while(!theStop){ /** * Start of Secure area for use of Transporter */ int send_heartbeat_now= global_flag_send_heartbeat_now; global_flag_send_heartbeat_now= 0; theFacade.lock_mutex(); for (int i = 1; i < MAX_NODES; i++){ /** * Send register request (heartbeat) to all available nodes * at specified timing intervals */ const NodeId nodeId = i; Node & theNode = theNodes[nodeId]; if (!theNode.defined) continue; if (theNode.connected == false){ theFacade.doConnect(nodeId); continue; } if (!theNode.compatible){ continue; } theNode.hbCounter += timeSlept; if (theNode.hbCounter >= theNode.hbFrequency || send_heartbeat_now) { /** * It is now time to send a new Heartbeat */ if (theNode.hbCounter >= theNode.hbFrequency) { theNode.hbSent++; theNode.hbCounter = 0; } /** * If the node is of type REP, * then the receiver of the signal should be API_CLUSTERMGR */ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; } #if 0 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); }//if if (theNode.hbSent == 4 && theNode.hbFrequency > 0){ reportNodeFailed(i); }//if } /** * End of secure area. Let other threads in */ theFacade.unlock_mutex(); // Sleep for 100 ms between each Registration Heartbeat Uint64 before = now; NdbSleep_MilliSleep(100); now = NdbTick_CurrentMillisecond(); timeSlept = (now - before); } } #if 0 void ClusterMgr::showState(NodeId nodeId){ ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl; ndbout << "theNodeList = " << theNodeList[nodeId] << endl; ndbout << "theNodeState = " << theNodeState[nodeId] << endl; ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl; ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl; ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl; } #endif ClusterMgr::Node::Node() : m_state(NodeState::SL_NOTHING) { compatible = nfCompleteRep = true; connected = defined = m_alive = false; } /****************************************************************************** * API_REGREQ and friends ******************************************************************************/ void ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); #if 0 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegReq->version){ node.m_info.m_version = apiRegReq->version; if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) || getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) { node.compatible = false; } else { node.compatible = true; } } NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGCONF; signal.theReceiversBlockNumber = API_CLUSTERMGR; signal.theTrace = 0; signal.theLength = ApiRegConf::SignalLength; ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend()); conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId()); conf->version = NDB_VERSION; conf->apiHeartbeatFrequency = node.hbFrequency; theFacade.sendSignalUnCond(&signal, nodeId); } int global_mgmt_server_check = 0; // set to one in mgmtsrvr main; void ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); #if 0 ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegConf->version){ node.m_info.m_version = apiRegConf->version; if (global_mgmt_server_check == 1) node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, node.m_info.m_version); else node.compatible = ndbCompatible_api_ndb(NDB_VERSION, node.m_info.m_version); } node.m_state = apiRegConf->nodeState; if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED || node.m_state.startLevel == NodeState::SL_SINGLEUSER)){ node.m_alive = true; } else { node.m_alive = false; }//if node.hbSent = 0; node.hbCounter = 0; if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } } void ClusterMgr::execAPI_REGREF(const Uint32 * theData){ ApiRegRef * ref = (ApiRegRef*)theData; const NodeId nodeId = refToNode(ref->ref); assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.connected == true); assert(node.defined == true); node.compatible = false; node.m_alive = false; node.m_state = NodeState::SL_NOTHING; node.m_info.m_version = ref->version; switch(ref->errorCode){ case ApiRegRef::WrongType: ndbout_c("Node %d reports that this node should be a NDB node", nodeId); abort(); case ApiRegRef::UnsupportedVersion: default: break; } } void ClusterMgr::execNODE_FAILREP(const Uint32 * theData){ NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0]; for(int i = 1; itheNodes, i)){ reportNodeFailed(i); } } } void ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){ NFCompleteRep * const nfComp = (NFCompleteRep *)theData; const NodeId nodeId = nfComp->failedNodeId; assert(nodeId > 0 && nodeId < MAX_NODES); theFacade.ReportNodeFailureComplete(nodeId); theNodes[nodeId].nfCompleteRep = true; } void ClusterMgr::reportConnected(NodeId nodeId){ /** * Ensure that we are sending heartbeat every 100 ms * until we have got the first reply from NDB providing * us with the real time-out period to use. */ assert(nodeId > 0 && nodeId < MAX_NODES); noOfConnectedNodes++; Node & theNode = theNodes[nodeId]; theNode.connected = true; theNode.hbSent = 0; theNode.hbCounter = 0; if (theNode.m_info.m_type != NodeInfo::REP) { theNode.hbFrequency = 0; } theNode.m_info.m_version = 0; theNode.compatible = true; theNode.nfCompleteRep = true; theFacade.ReportNodeAlive(nodeId); } void ClusterMgr::reportDisconnected(NodeId nodeId){ assert(nodeId > 0 && nodeId < MAX_NODES); assert(noOfConnectedNodes > 0); noOfConnectedNodes--; theNodes[nodeId].connected = false; theNodes[nodeId].m_info.m_connectCount ++; reportNodeFailed(nodeId); } void ClusterMgr::reportNodeFailed(NodeId nodeId){ Node & theNode = theNodes[nodeId]; theNode.m_alive = false; if(theNode.connected) theFacade.doDisconnect(nodeId); const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING); theNode.m_state.startLevel = NodeState::SL_NOTHING; if(report){ theFacade.ReportNodeDead(nodeId); } theNode.nfCompleteRep = false; if(noOfConnectedNodes == 0){ NFCompleteRep rep; for(Uint32 i = 1; ithreadMain(); my_thread_end(); NdbThread_Exit(0); return NULL; } void ArbitMgr::sendSignalToThread(ArbitSignal& aSignal) { #ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit recv: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " send=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl; #endif aSignal.setTimestamp(); // signal arrival time NdbMutex_Lock(theInputMutex); while (theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000); } theInputBuffer = aSignal; theInputFull = true; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); } void ArbitMgr::threadMain() { ArbitSignal aSignal; aSignal = theInputBuffer; threadStart(aSignal); bool stop = false; while (! stop) { NdbMutex_Lock(theInputMutex); while (! theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout); threadTimeout(); } aSignal = theInputBuffer; theInputFull = false; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); switch (aSignal.gsn) { case GSN_ARBIT_CHOOSEREQ: threadChoose(aSignal); break; case GSN_ARBIT_STOPORD: stop = true; break; } } threadStop(aSignal); } // handle events in the thread void ArbitMgr::threadStart(ArbitSignal& aSignal) { theStartReq = aSignal; sendStartConf(theStartReq, ArbitCode::ApiStart); theState = StateStarted; theInputTimeout = 1000; } void ArbitMgr::threadChoose(ArbitSignal& aSignal) { switch (theState) { case StateStarted: // first REQ if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq1 = aSignal; if (theDelay == 0) { sendChooseConf(aSignal, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; } theState = StateChoose1; theInputTimeout = 1; return; case StateChoose1: // second REQ within Delay if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq2 = aSignal; theState = StateChoose2; theInputTimeout = 1; return; case StateChoose2: // too many REQs - refuse all if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } sendChooseRef(theChooseReq1, ArbitCode::ErrToomany); sendChooseRef(theChooseReq2, ArbitCode::ErrToomany); sendChooseRef(aSignal, ArbitCode::ErrToomany); theState = StateFinished; theInputTimeout = 1000; return; default: sendChooseRef(aSignal, ArbitCode::ErrState); break; } } void ArbitMgr::threadTimeout() { switch (theState) { case StateStarted: break; case StateChoose1: if (theChooseReq1.getTimediff() < theDelay) break; sendChooseConf(theChooseReq1, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); theState = StateFinished; theInputTimeout = 1000; break; default: break; } } void ArbitMgr::threadStop(ArbitSignal& aSignal) { switch (aSignal.data.code) { case StopExit: switch (theState) { case StateStarted: sendStopRep(theStartReq, 0); break; case StateChoose1: // just in time sendChooseConf(theChooseReq1, ArbitCode::WinChoose); break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); break; case StateInit: case StateFinished: //?? break; } break; case StopRequest: break; case StopRestart: break; } } // output routines void ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STARTCONF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSECONF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSEREF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STOPREP; copySignal.data.code = code; sendSignalToQmgr(copySignal); } /** * Send signal to QMGR. The input includes signal number and * signal data. The signal data is normally a copy of a received * signal so it contains expected arbitrator node id and ticket. * The sender in signal data is the QMGR node id. */ void ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal) { NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = aSignal.gsn; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ArbitSignalData::SignalLength; ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend()); sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId()); sd->code = aSignal.data.code; sd->node = aSignal.data.node; sd->ticket = aSignal.data.ticket; sd->mask = aSignal.data.mask; #ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit send: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " recv=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl; #endif theFacade.lock_mutex(); theFacade.sendSignalUnCond(&signal, aSignal.data.sender); theFacade.unlock_mutex(); }