/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include "TransporterFacade.hpp" #include "ClusterMgr.hpp" #include #include "AttrType.hpp" #include "NdbApiSignal.hpp" #include "API.hpp" #include #include #include #include #include #include #include // Just a C wrapper for threadMain extern "C" void* runClusterMgr_C(void * me) { ((ClusterMgr*) me)->threadMain(); /** * Sleep to allow another thread that is not exiting to take control * of signals allocated by this thread * * see Ndb::~Ndb() in Ndbinit.cpp */ #ifdef NDB_OSE NdbSleep_MilliSleep(50); #endif NdbThread_Exit(0); return NULL; } extern "C" { void ndbSetOwnVersion(); } ClusterMgr::ClusterMgr(TransporterFacade & _facade): theStop(0), theFacade(_facade) { ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); noOfConnectedNodes = 0; } ClusterMgr::~ClusterMgr(){ doStop(); NdbMutex_Destroy(clusterMgrThreadMutex); } void ClusterMgr::init(const IPCConfig & config){ NodeId tmp = 0; while(config.getNextRemoteNodeId(tmp)) { theNodes[tmp].defined = true; #if 0 ndbout << "--------------------------------------" << endl; config.print(); ndbout << "--------------------------------------" << endl; ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp)); #endif if(strcmp(config.getNodeType(tmp), "DB") == 0) { theNodes[tmp].m_info.m_type = NodeInfo::DB; } else if(strcmp(config.getNodeType(tmp), "API") == 0) { theNodes[tmp].m_info.m_type = NodeInfo::API; } else if(strcmp(config.getNodeType(tmp), "MGM") == 0) { theNodes[tmp].m_info.m_type = NodeInfo::MGM; } else if(strcmp(config.getNodeType(tmp), "REP") == 0) { theNodes[tmp].m_info.m_type = NodeInfo::REP; } else if(strcmp(config.getNodeType(tmp), "EXTERNAL REP") == 0) { theNodes[tmp].m_info.m_type = NodeInfo::REP; theNodes[tmp].hbFrequency = config.getREPHBFrequency(tmp); assert(100 <= theNodes[tmp].hbFrequency && theNodes[tmp].hbFrequency < 60 * 60 * 1000); } else { #if 0 ndbout_c("ClusterMgr: Unknown node type: %s", config.getNodeType(tmp)); #endif } } } void ClusterMgr::startThread() { NdbMutex_Lock(clusterMgrThreadMutex); theStop = 0; theClusterMgrThread = NdbThread_Create(runClusterMgr_C, (void**)this, 32768, "ndb_clustermgr", NDB_THREAD_PRIO_LOW); NdbMutex_Unlock(clusterMgrThreadMutex); } void ClusterMgr::doStop( ){ NdbMutex_Lock(clusterMgrThreadMutex); if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); return; } void *status; theStop = 1; NdbThread_WaitFor(theClusterMgrThread, &status); NdbThread_Destroy(&theClusterMgrThread); NdbMutex_Unlock(clusterMgrThreadMutex); } void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; Uint32 timeSlept = 100; Uint64 now = NdbTick_CurrentMillisecond(); while(!theStop){ /** * Start of Secure area for use of Transporter */ theFacade.lock_mutex(); for (int i = 1; i < MAX_NODES; i++){ /** * Send register request (heartbeat) to all available nodes * at specified timing intervals */ const NodeId nodeId = i; Node & theNode = theNodes[nodeId]; if (theNode.defined == true) { #if 0 ndbout_c("ClusterMgr: compatible %d", (int)nodeId); #endif if (theNode.connected == false){ theFacade.doConnect(nodeId); continue; } #if 0 ndbout_c("ClusterMgr: connected %d", (int)nodeId); #endif theNode.hbCounter += timeSlept; if (theNode.hbCounter >= theNode.hbFrequency){ /** * It is now time to send a new Heartbeat */ theNode.hbSent++; theNode.hbCounter = 0; /** * If the node is of type REP, * then the receiver of the signal should be API_CLUSTERMGR */ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; } #if 0 ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); }//if if (theNode.hbSent == 4 && theNode.hbFrequency > 0){ reportNodeFailed(i); }//if }//if(defined) }//for /** * End of secure area. Let other threads in */ theFacade.unlock_mutex(); // Sleep for 100 ms between each Registration Heartbeat Uint64 before = now; NdbSleep_MilliSleep(100); now = NdbTick_CurrentMillisecond(); timeSlept = (now - before); } } #if 0 void ClusterMgr::showState(NodeId nodeId){ ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl; ndbout << "theNodeList = " << theNodeList[nodeId] << endl; ndbout << "theNodeState = " << theNodeState[nodeId] << endl; ndbout << "theNodeCount = " << theNodeCount[nodeId] << endl; ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl; ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl; } #endif ClusterMgr::Node::Node() : m_state(NodeState::SL_NOTHING) { compatible = nfCompleteRep = true; connected = defined = m_alive = false; } /****************************************************************************** * API_REGREQ and friends ******************************************************************************/ void ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); #if 0 ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegReq->version){ node.m_info.m_version = apiRegReq->version; if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) || getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) { node.compatible = false; } else { node.compatible = true; } } NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGCONF; signal.theReceiversBlockNumber = API_CLUSTERMGR; signal.theTrace = 0; signal.theLength = ApiRegConf::SignalLength; ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend()); conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId()); conf->version = NDB_VERSION; conf->apiHeartbeatFrequency = node.hbFrequency; theFacade.sendSignalUnCond(&signal, nodeId); } int global_mgmt_server_check = 0; // set to one in mgmtsrvr main; void ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.defined == true); assert(node.connected == true); if(node.m_info.m_version != apiRegConf->version){ node.m_info.m_version = apiRegConf->version; if (global_mgmt_server_check == 1) node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, node.m_info.m_version); else node.compatible = ndbCompatible_api_ndb(NDB_VERSION, node.m_info.m_version); } node.m_state = apiRegConf->nodeState; if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED || node.m_state.startLevel == NodeState::SL_SINGLEUSER)){ node.m_alive = true; } else { node.m_alive = false; }//if node.hbSent = 0; node.hbCounter = 0; if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } } void ClusterMgr::execAPI_REGREF(const Uint32 * theData){ ApiRegRef * ref = (ApiRegRef*)theData; const NodeId nodeId = refToNode(ref->ref); assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; assert(node.connected == true); assert(node.defined == true); node.compatible = false; node.m_alive = false; node.m_state = NodeState::SL_NOTHING; node.m_info.m_version = ref->version; switch(ref->errorCode){ case ApiRegRef::WrongType: ndbout_c("Node %d reports that this node should be a NDB node", nodeId); abort(); case ApiRegRef::UnsupportedVersion: default: break; } } void ClusterMgr::execNODE_FAILREP(const Uint32 * theData){ NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0]; for(int i = 1; itheNodes, i)){ reportNodeFailed(i); } } } void ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){ NFCompleteRep * const nfComp = (NFCompleteRep *)theData; const NodeId nodeId = nfComp->failedNodeId; assert(nodeId > 0 && nodeId < MAX_NODES); theFacade.ReportNodeFailureComplete(nodeId); theNodes[nodeId].nfCompleteRep = true; } void ClusterMgr::reportConnected(NodeId nodeId){ /** * Ensure that we are sending heartbeat every 100 ms * until we have got the first reply from NDB providing * us with the real time-out period to use. */ assert(nodeId > 0 && nodeId < MAX_NODES); noOfConnectedNodes++; Node & theNode = theNodes[nodeId]; theNode.connected = true; theNode.hbSent = 0; theNode.hbCounter = 0; if (theNode.m_info.m_type != NodeInfo::REP) { theNode.hbFrequency = 0; } theNode.m_info.m_version = 0; theNode.compatible = true; theNode.nfCompleteRep = true; theFacade.ReportNodeAlive(nodeId); } void ClusterMgr::reportDisconnected(NodeId nodeId){ assert(nodeId > 0 && nodeId < MAX_NODES); assert(noOfConnectedNodes > 0); noOfConnectedNodes--; theNodes[nodeId].connected = false; theNodes[nodeId].m_info.m_connectCount ++; reportNodeFailed(nodeId); } void ClusterMgr::reportNodeFailed(NodeId nodeId){ Node & theNode = theNodes[nodeId]; theNode.m_alive = false; if(theNode.connected) theFacade.doDisconnect(nodeId); const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING); theNode.m_state.startLevel = NodeState::SL_NOTHING; if(report){ theFacade.ReportNodeDead(nodeId); } theNode.nfCompleteRep = false; if(noOfConnectedNodes == 0){ Uint32 theData[1]; NFCompleteRep * rep = (NFCompleteRep *)&theData[0]; for(Uint32 i = 1; ifailedNodeId = i; execNF_COMPLETEREP(theData); } } } } /****************************************************************************** * Arbitrator ******************************************************************************/ ArbitMgr::ArbitMgr(TransporterFacade & _fac) : theFacade(_fac) { theThreadMutex = NdbMutex_Create(); theInputCond = NdbCondition_Create(); theInputMutex = NdbMutex_Create(); theRank = 0; theDelay = 0; theThread = 0; theInputTimeout = 0; theInputFull = false; memset(&theInputFull, 0, sizeof(theInputFull)); theState = StateInit; memset(&theStartReq, 0, sizeof(theStartReq)); memset(&theChooseReq1, 0, sizeof(theChooseReq1)); memset(&theChooseReq2, 0, sizeof(theChooseReq2)); memset(&theStopOrd, 0, sizeof(theStopOrd)); } ArbitMgr::~ArbitMgr() { NdbMutex_Destroy(theThreadMutex); NdbCondition_Destroy(theInputCond); NdbMutex_Destroy(theInputMutex); } // Start arbitrator thread. This is kernel request. // First stop any previous thread since it is a left-over // which was never used and which now has wrong ticket. void ArbitMgr::doStart(const Uint32* theData) { ArbitSignal aSignal; NdbMutex_Lock(theThreadMutex); if (theThread != NULL) { aSignal.init(GSN_ARBIT_STOPORD, NULL); aSignal.data.code = StopRestart; sendSignalToThread(aSignal); void* value; NdbThread_WaitFor(theThread, &value); theThread = NULL; theState = StateInit; theInputFull = false; } aSignal.init(GSN_ARBIT_STARTREQ, theData); sendSignalToThread(aSignal); theThread = NdbThread_Create( runArbitMgr_C, (void**)this, 32768, "ndb_arbitmgr", NDB_THREAD_PRIO_HIGH); NdbMutex_Unlock(theThreadMutex); } // The "choose me" signal from a candidate. void ArbitMgr::doChoose(const Uint32* theData) { ArbitSignal aSignal; aSignal.init(GSN_ARBIT_CHOOSEREQ, theData); sendSignalToThread(aSignal); } // Stop arbitrator thread via stop signal from the kernel // or when exiting API program. void ArbitMgr::doStop(const Uint32* theData) { ArbitSignal aSignal; NdbMutex_Lock(theThreadMutex); if (theThread != NULL) { aSignal.init(GSN_ARBIT_STOPORD, theData); if (theData == 0) { aSignal.data.code = StopExit; } else { aSignal.data.code = StopRequest; } sendSignalToThread(aSignal); void* value; NdbThread_WaitFor(theThread, &value); theThread = NULL; theState = StateInit; } NdbMutex_Unlock(theThreadMutex); } // private methods extern "C" void* runArbitMgr_C(void* me) { ((ArbitMgr*) me)->threadMain(); NdbThread_Exit(0); return NULL; } void ArbitMgr::sendSignalToThread(ArbitSignal& aSignal) { #ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit recv: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " send=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl; #endif aSignal.setTimestamp(); // signal arrival time NdbMutex_Lock(theInputMutex); while (theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000); } theInputBuffer = aSignal; theInputFull = true; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); } void ArbitMgr::threadMain() { ArbitSignal aSignal; aSignal = theInputBuffer; threadStart(aSignal); bool stop = false; while (! stop) { NdbMutex_Lock(theInputMutex); while (! theInputFull) { NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout); threadTimeout(); } aSignal = theInputBuffer; theInputFull = false; NdbCondition_Signal(theInputCond); NdbMutex_Unlock(theInputMutex); switch (aSignal.gsn) { case GSN_ARBIT_CHOOSEREQ: threadChoose(aSignal); break; case GSN_ARBIT_STOPORD: stop = true; break; } } threadStop(aSignal); } // handle events in the thread void ArbitMgr::threadStart(ArbitSignal& aSignal) { theStartReq = aSignal; sendStartConf(theStartReq, ArbitCode::ApiStart); theState = StateStarted; theInputTimeout = 1000; } void ArbitMgr::threadChoose(ArbitSignal& aSignal) { switch (theState) { case StateStarted: // first REQ if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq1 = aSignal; if (theDelay == 0) { sendChooseConf(aSignal, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; } theState = StateChoose1; theInputTimeout = 1; return; case StateChoose1: // second REQ within Delay if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } theChooseReq2 = aSignal; theState = StateChoose2; theInputTimeout = 1; return; case StateChoose2: // too many REQs - refuse all if (! theStartReq.data.match(aSignal.data)) { sendChooseRef(aSignal, ArbitCode::ErrTicket); break; } sendChooseRef(theChooseReq1, ArbitCode::ErrToomany); sendChooseRef(theChooseReq2, ArbitCode::ErrToomany); sendChooseRef(aSignal, ArbitCode::ErrToomany); theState = StateFinished; theInputTimeout = 1000; return; default: sendChooseRef(aSignal, ArbitCode::ErrState); break; } } void ArbitMgr::threadTimeout() { switch (theState) { case StateStarted: break; case StateChoose1: if (theChooseReq1.getTimediff() < theDelay) break; sendChooseConf(theChooseReq1, ArbitCode::WinChoose); theState = StateFinished; theInputTimeout = 1000; break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); theState = StateFinished; theInputTimeout = 1000; break; default: break; } } void ArbitMgr::threadStop(ArbitSignal& aSignal) { switch (aSignal.data.code) { case StopExit: switch (theState) { case StateStarted: sendStopRep(theStartReq, 0); break; case StateChoose1: // just in time sendChooseConf(theChooseReq1, ArbitCode::WinChoose); break; case StateChoose2: sendChooseConf(theChooseReq1, ArbitCode::WinChoose); sendChooseConf(theChooseReq2, ArbitCode::LoseChoose); break; case StateInit: case StateFinished: //?? break; } break; case StopRequest: break; case StopRestart: break; } } // output routines void ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STARTCONF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSECONF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_CHOOSEREF; copySignal.data.code = code; sendSignalToQmgr(copySignal); } void ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code) { ArbitSignal copySignal = aSignal; copySignal.gsn = GSN_ARBIT_STOPREP; copySignal.data.code = code; sendSignalToQmgr(copySignal); } /** * Send signal to QMGR. The input includes signal number and * signal data. The signal data is normally a copy of a received * signal so it contains expected arbitrator node id and ticket. * The sender in signal data is the QMGR node id. */ void ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal) { NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = aSignal.gsn; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ArbitSignalData::SignalLength; ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend()); sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId()); sd->code = aSignal.data.code; sd->node = aSignal.data.node; sd->ticket = aSignal.data.ticket; sd->mask = aSignal.data.mask; #ifdef DEBUG_ARBIT char buf[17] = ""; ndbout << "arbit send: "; ndbout << " gsn=" << aSignal.gsn; ndbout << " recv=" << aSignal.data.sender; ndbout << " code=" << aSignal.data.code; ndbout << " node=" << aSignal.data.node; ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf)); ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf)); ndbout << endl; #endif theFacade.lock_mutex(); theFacade.sendSignalUnCond(&signal, aSignal.data.sender); theFacade.unlock_mutex(); }