/* Copyright (c) 2003-2007 MySQL AB Use is subject to license terms This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ #include #include #include #include #include "TransporterFacade.hpp" #include "ClusterMgr.hpp" #include #include "NdbApiSignal.hpp" #include "API.hpp" #include #include #include #include #include #include #include #include #include int global_flag_skip_invalidate_cache = 0; //#define DEBUG_REG // Just a C wrapper for threadMain extern "C" void* runClusterMgr_C(void * me) { ((ClusterMgr*) me)->threadMain(); return NULL; } extern "C" { void ndbSetOwnVersion(); } ClusterMgr::ClusterMgr(TransporterFacade & _facade): theStop(0), theFacade(_facade) { DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); waitForHBCond= NdbCondition_Create(); waitingForHB= false; m_max_api_reg_req_interval= 0xFFFFFFFF; // MAX_INT noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; m_connect_count = 0; m_cluster_state = CS_waiting_for_clean_cache; DBUG_VOID_RETURN; } ClusterMgr::~ClusterMgr() { DBUG_ENTER("ClusterMgr::~ClusterMgr"); doStop(); NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void ClusterMgr::init(ndb_mgm_configuration_iterator & iter){ for(iter.first(); iter.valid(); iter.next()){ Uint32 tmp = 0; if(iter.get(CFG_NODE_ID, &tmp)) continue; theNodes[tmp].defined = true; #if 0 ndbout << "--------------------------------------" << endl; ndbout << "--------------------------------------" << endl; ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp)); #endif unsigned type; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; switch(type){ case NODE_TYPE_DB: theNodes[tmp].m_info.m_type = NodeInfo::DB; break; case NODE_TYPE_API: theNodes[tmp].m_info.m_type = NodeInfo::API; break; case NODE_TYPE_MGM: theNodes[tmp].m_info.m_type = NodeInfo::MGM; break; default: type = type; #if 0 ndbout_c("ClusterMgr: Unknown node type: %d", type); #endif } } } void ClusterMgr::startThread() { NdbMutex_Lock(clusterMgrThreadMutex); theStop = 0; theClusterMgrThread = NdbThread_Create(runClusterMgr_C, (void**)this, 32768, "ndb_clustermgr", NDB_THREAD_PRIO_LOW); NdbMutex_Unlock(clusterMgrThreadMutex); } void ClusterMgr::doStop( ){ DBUG_ENTER("ClusterMgr::doStop"); NdbMutex_Lock(clusterMgrThreadMutex); if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void *status; theStop = 1; if (theClusterMgrThread) { NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_Destroy(&theClusterMgrThread); } NdbMutex_Unlock(clusterMgrThreadMutex); DBUG_VOID_RETURN; } void ClusterMgr::forceHB() { theFacade.lock_mutex(); if(waitingForHB) { NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); theFacade.unlock_mutex(); return; } waitingForHB= true; NodeBitmask ndb_nodes; ndb_nodes.clear(); waitForHBFromNodes.clear(); for(Uint32 i = 0; i < MAX_NODES; i++) { if(!theNodes[i].defined) continue; if(theNodes[i].m_info.m_type == NodeInfo::DB) { ndb_nodes.set(i); const ClusterMgr::Node &node= getNodeInfo(i); waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes); } } waitForHBFromNodes.bitAND(ndb_nodes); #ifdef DEBUG_REG char buf[128]; ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; int nodeId= 0; for(int i=0; (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.find(i)); i= nodeId+1) { #ifdef DEBUG_REG ndbout << "FORCE HB to " << nodeId << endl; #endif theFacade.sendSignalUnCond(&signal, nodeId); } /* Wait for nodes to reply - if any heartbeats was sent */ if (!waitForHBFromNodes.isclear()) NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); waitingForHB= false; #ifdef DEBUG_REG ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif theFacade.unlock_mutex(); } void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; Uint32 timeSlept = 100; Uint64 now = NdbTick_CurrentMillisecond(); while(!theStop){ /** * Start of Secure area for use of Transporter */ if (m_cluster_state == CS_waiting_for_clean_cache) { theFacade.m_globalDictCache.lock(); unsigned sz= theFacade.m_globalDictCache.get_size(); theFacade.m_globalDictCache.unlock(); if (sz) goto next; m_cluster_state = CS_waiting_for_first_connect; } theFacade.lock_mutex(); for (int i = 1; i < MAX_NDB_NODES; i++){ /** * Send register request (heartbeat) to all available nodes * at specified timing intervals */ const NodeId nodeId = i; Node & theNode = theNodes[nodeId]; if (!theNode.defined) continue; if (theNode.connected == false){ theFacade.doConnect(nodeId); continue; } if (!theNode.compatible){ continue; } theNode.hbCounter += timeSlept; if (theNode.hbCounter >= m_max_api_reg_req_interval || theNode.hbCounter >= theNode.hbFrequency) { /** * It is now time to send a new Heartbeat */ if (theNode.hbCounter >= theNode.hbFrequency) { theNode.m_info.m_heartbeat_cnt++; theNode.hbCounter = 0; } #ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); }//if if (theNode.m_info.m_heartbeat_cnt == 4 && theNode.hbFrequency > 0){ reportNodeFailed(i); }//if } /** * End of secure area. Let other threads in */ theFacade.unlock_mutex(); next: // Sleep for 100 ms between each Registration Heartbeat Uint64 before = now; NdbSleep_MilliSleep(100); now = NdbTick_CurrentMillisecond(); timeSlept = (now - before); } } #if 0 void ClusterMgr::showState(NodeId nodeId){ ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl; ndbout << "theNodeList = " << theNodeList[nodeId] << endl; ndbout << "theNodeState = " << theNodeState[nodeId] << endl; ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl; ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl; ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl; } #endif ClusterMgr::Node::Node() : m_state(NodeState::SL_NOTHING) { compatible = nfCompleteRep = true; connected = defined = m_alive = m_api_reg_conf = false; m_state.m_connected_nodes.clear(); } /****************************************************************************** * API_REGREQ and friends ******************************************************************************/ void ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegReq->version){ node.m_info.m_version = apiRegReq->version; if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) || getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) { node.compatible = false; } else { node.compatible = true; } } NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGCONF; signal.theReceiversBlockNumber = API_CLUSTERMGR; signal.theTrace = 0; signal.theLength = ApiRegConf::SignalLength; ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend()); conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId()); conf->version = NDB_VERSION; conf->apiHeartbeatFrequency = node.hbFrequency; theFacade.sendSignalUnCond(&signal, nodeId); } void ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegConf->version){ node.m_info.m_version = apiRegConf->version; if(theNodes[theFacade.ownId()].m_info.m_type == NodeInfo::MGM) node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, node.m_info.m_version); else node.compatible = ndbCompatible_api_ndb(NDB_VERSION, node.m_info.m_version); } node.m_api_reg_conf = true; node.m_state = apiRegConf->nodeState; if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED || node.m_state.getSingleUserMode())){ set_node_alive(node, true); } else { set_node_alive(node, false); }//if node.m_info.m_heartbeat_cnt = 0; node.hbCounter = 0; if(waitingForHB) { waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) { waitingForHB= false; NdbCondition_Broadcast(waitForHBCond); } } node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } void ClusterMgr::execAPI_REGREF(const Uint32 * theData){ ApiRegRef * ref = (ApiRegRef*)theData; const NodeId nodeId = refToNode(ref->ref); assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.connected == true); assert(node.defined == true); node.compatible = false; set_node_alive(node, false); node.m_state = NodeState::SL_NOTHING; node.m_info.m_version = ref->version; switch(ref->errorCode){ case ApiRegRef::WrongType: ndbout_c("Node %d reports that this node should be a NDB node", nodeId); abort(); case ApiRegRef::UnsupportedVersion: default: break; } waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) NdbCondition_Signal(waitForHBCond); } void ClusterMgr::execNODE_FAILREP(const Uint32 * theData){ NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0]; for(int i = 1; itheNodes, i)){ reportNodeFailed(i); } } } void ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){ NFCompleteRep * const nfComp = (NFCompleteRep *)theData; const NodeId nodeId = nfComp->failedNodeId; assert(nodeId > 0 && nodeId < MAX_NODES); theFacade.ReportNodeFailureComplete(nodeId); theNodes[nodeId].nfCompleteRep = true; } void ClusterMgr::reportConnected(NodeId nodeId){ DBUG_ENTER("ClusterMgr::reportConnected"); DBUG_PRINT("info", ("nodeId: %u", nodeId)); /** * Ensure that we are sending heartbeat every 100 ms * until we have got the first reply from NDB providing * us with the real time-out period to use. */ assert(nodeId > 0 && nodeId < MAX_NODES); noOfConnectedNodes++; Node & theNode = theNodes[nodeId]; theNode.connected = true; theNode.m_info.m_heartbeat_cnt = 0; theNode.hbCounter = 0; /** * make sure the node itself is marked connected even * if first API_REGCONF has not arrived */ theNode.m_state.m_connected_nodes.set(nodeId); theNode.hbFrequency = 0; theNode.m_info.m_version = 0; theNode.compatible = true; theNode.nfCompleteRep = true; theNode.m_state.startLevel = NodeState::SL_NOTHING; theFacade.ReportNodeAlive(nodeId); DBUG_VOID_RETURN; } void ClusterMgr::reportDisconnected(NodeId nodeId){ assert(nodeId > 0 && nodeId < MAX_NODES); assert(noOfConnectedNodes > 0); noOfConnectedNodes--; theNodes[nodeId].connected = false; theNodes[nodeId].m_api_reg_conf = false; theNodes[nodeId].m_state.m_connected_nodes.clear(); reportNodeFailed(nodeId, true); } void ClusterMgr::reportNodeFailed(NodeId nodeId, bool disconnect){ Node & theNode = theNodes[nodeId]; set_node_alive(theNode, false); theNode.m_info.m_connectCount ++; if(theNode.connected) { theFacade.doDisconnect(nodeId); } const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING); theNode.m_state.startLevel = NodeState::SL_NOTHING; if(disconnect || report) { theFacade.ReportNodeDead(nodeId); } theNode.nfCompleteRep = false; if(noOfAliveNodes == 0) { if (!global_flag_skip_invalidate_cache) { theFacade.m_globalDictCache.lock(); theFacade.m_globalDictCache.invalidate_all(); theFacade.m_globalDictCache.unlock(); m_connect_count ++; m_cluster_state = CS_waiting_for_clean_cache; } NFCompleteRep rep; for(Uint32 i = 1; ithreadMain(); return NULL; } void ArbitMgr::sendSignalToThread(ArbitSignal& aSignal) { #ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit recv: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " send=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl; #endif aSignal.setTimestamp(); // signal arrival time NdbMutex_Lock(theInputMutex); while (theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000); } theInputBuffer = aSignal; theInputFull = true; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); } void ArbitMgr::threadMain() { ArbitSignal aSignal; aSignal = theInputBuffer; threadStart(aSignal); bool stop = false; while (! stop) { NdbMutex_Lock(theInputMutex); while (! theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout); threadTimeout(); } aSignal = theInputBuffer; theInputFull = false; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); switch (aSignal.gsn) { case GSN_ARBIT_CHOOSEREQ: threadChoose(aSignal); break; case GSN_ARBIT_STOPORD: stop = true; break; } } threadStop(aSignal); } // handle events in the thread void ArbitMgr::threadStart(ArbitSignal& aSignal) { theStartReq = aSignal; sendStartConf(theStartReq, ArbitCode::ApiStart); theState = StateStarted; theInputTimeout = 1000; } void ArbitMgr::threadChoose(ArbitSignal& aSignal) { switch (theState) { case StateStarted: // first REQ if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq1 = aSignal; if (theDelay == 0) { sendChooseConf(aSignal, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; } theState = StateChoose1; theInputTimeout = 1; return; case StateChoose1: // second REQ within Delay if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq2 = aSignal; theState = StateChoose2; theInputTimeout = 1; return; case StateChoose2: // too many REQs - refuse all if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } sendChooseRef(theChooseReq1, ArbitCode::ErrToomany); sendChooseRef(theChooseReq2, ArbitCode::ErrToomany); sendChooseRef(aSignal, ArbitCode::ErrToomany); theState = StateFinished; theInputTimeout = 1000; return; default: sendChooseRef(aSignal, ArbitCode::ErrState); break; } } void ArbitMgr::threadTimeout() { switch (theState) { case StateStarted: break; case StateChoose1: if (theChooseReq1.getTimediff() < theDelay) break; sendChooseConf(theChooseReq1, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); theState = StateFinished; theInputTimeout = 1000; break; default: break; } } void ArbitMgr::threadStop(ArbitSignal& aSignal) { switch (aSignal.data.code) { case StopExit: switch (theState) { case StateStarted: sendStopRep(theStartReq, 0); break; case StateChoose1: // just in time sendChooseConf(theChooseReq1, ArbitCode::WinChoose); break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); break; case StateInit: case StateFinished: //?? break; } break; case StopRequest: break; case StopRestart: break; } } // output routines void ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STARTCONF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSECONF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSEREF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STOPREP; copySignal.data.code = code; sendSignalToQmgr(copySignal); } /** * Send signal to QMGR. The input includes signal number and * signal data. The signal data is normally a copy of a received * signal so it contains expected arbitrator node id and ticket. * The sender in signal data is the QMGR node id. */ void ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal) { NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = aSignal.gsn; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ArbitSignalData::SignalLength; ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend()); sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId()); sd->code = aSignal.data.code; sd->node = aSignal.data.node; sd->ticket = aSignal.data.ticket; sd->mask = aSignal.data.mask; #ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit send: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " recv=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl; #endif theFacade.lock_mutex(); theFacade.sendSignalUnCond(&signal, aSignal.data.sender); theFacade.unlock_mutex(); }