diff options
Diffstat (limited to 'storage/ndb/src/ndbapi/ClusterMgr.hpp')
-rw-r--r-- | storage/ndb/src/ndbapi/ClusterMgr.hpp | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/storage/ndb/src/ndbapi/ClusterMgr.hpp b/storage/ndb/src/ndbapi/ClusterMgr.hpp new file mode 100644 index 00000000000..20912938cf3 --- /dev/null +++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp @@ -0,0 +1,256 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef ClusterMgr_H +#define ClusterMgr_H + +#include "API.hpp" +#include <ndb_limits.h> +#include <NdbThread.h> +#include <NdbMutex.h> +#include <NdbCondition.h> +#include <signaldata/ArbitSignalData.hpp> +#include <signaldata/NodeStateSignalData.hpp> +#include <NodeInfo.hpp> +#include <NodeState.hpp> + +extern "C" void* runClusterMgr_C(void * me); + + +/** + * @class ClusterMgr + */ +class ClusterMgr { + friend void* runClusterMgr_C(void * me); + friend void execute(void *, struct SignalHeader * const, + Uint8, Uint32 * const, LinearSectionPtr ptr[3]); +public: + ClusterMgr(class TransporterFacade &); + ~ClusterMgr(); + void init(struct ndb_mgm_configuration_iterator & config); + + void reportConnected(NodeId nodeId); + void reportDisconnected(NodeId nodeId); + + bool checkUpgradeCompatability(Uint32 nodeVersion); + + void doStop(); + void startThread(); + + void forceHB(); + +private: + void threadMain(); + + int theStop; + class TransporterFacade & theFacade; + +public: + enum Cluster_state { + CS_waiting_for_clean_cache = 0, + CS_waiting_for_first_connect, + CS_connected + }; + struct Node { + Node(); + bool defined; + bool connected; // Transporter connected + bool compatible; // Version is compatible + bool nfCompleteRep; // NF Complete Rep has arrived + bool m_alive; // Node is alive + + NodeInfo m_info; + NodeState m_state; + + /** + * Heartbeat stuff + */ + Uint32 hbFrequency; // Heartbeat frequence + Uint32 hbCounter; // # milliseconds passed since last hb sent + }; + + const Node & getNodeInfo(NodeId) const; + Uint32 getNoOfConnectedNodes() const; + bool isClusterAlive() const; + void hb_received(NodeId); + + Uint32 m_connect_count; +private: + Uint32 noOfAliveNodes; + Uint32 noOfConnectedNodes; + Node theNodes[MAX_NODES]; + NdbThread* theClusterMgrThread; + + NodeBitmask waitForHBFromNodes; // used in forcing HBs + NdbCondition* waitForHBCond; + bool waitingForHB; + + enum Cluster_state m_cluster_state; + /** + * Used for controlling start/stop of the thread + */ + NdbMutex* clusterMgrThreadMutex; + + void showState(NodeId nodeId); + void reportNodeFailed(NodeId nodeId); + + /** + * Signals received + */ + void execAPI_REGREQ (const Uint32 * theData); + void execAPI_REGCONF (const Uint32 * theData); + void execAPI_REGREF (const Uint32 * theData); + void execNODE_FAILREP (const Uint32 * theData); + void execNF_COMPLETEREP(const Uint32 * theData); + + inline void set_node_alive(Node& node, bool alive){ + if(node.m_alive && !alive) + { + assert(noOfAliveNodes); + noOfAliveNodes--; + } + else if(!node.m_alive && alive) + { + noOfAliveNodes++; + } + node.m_alive = alive; + } +}; + +inline +const ClusterMgr::Node & +ClusterMgr::getNodeInfo(NodeId nodeId) const { + return theNodes[nodeId]; +} + +inline +Uint32 +ClusterMgr::getNoOfConnectedNodes() const { + return noOfConnectedNodes; +} + +inline +bool +ClusterMgr::isClusterAlive() const { + return noOfAliveNodes != 0; +} +inline +void +ClusterMgr::hb_received(NodeId nodeId) { + theNodes[nodeId].m_info.m_heartbeat_cnt= 0; +} + +/*****************************************************************************/ + +/** + * @class ArbitMgr + * Arbitration manager. Runs in separate thread. + * Started only by a request from the kernel. + */ + +extern "C" void* runArbitMgr_C(void* me); + +class ArbitMgr +{ +public: + ArbitMgr(class TransporterFacade &); + ~ArbitMgr(); + + inline void setRank(unsigned n) { theRank = n; } + inline void setDelay(unsigned n) { theDelay = n; } + + void doStart(const Uint32* theData); + void doChoose(const Uint32* theData); + void doStop(const Uint32* theData); + + friend void* runArbitMgr_C(void* me); + +private: + class TransporterFacade & theFacade; + unsigned theRank; + unsigned theDelay; + + void threadMain(); + NdbThread* theThread; + NdbMutex* theThreadMutex; // not really needed + + struct ArbitSignal { + GlobalSignalNumber gsn; + ArbitSignalData data; + NDB_TICKS timestamp; + + ArbitSignal() {} + + inline void init(GlobalSignalNumber aGsn, const Uint32* aData) { + gsn = aGsn; + if (aData != NULL) + memcpy(&data, aData, sizeof(data)); + else + memset(&data, 0, sizeof(data)); + } + + inline void setTimestamp() { + timestamp = NdbTick_CurrentMillisecond(); + } + + inline NDB_TICKS getTimediff() { + NDB_TICKS now = NdbTick_CurrentMillisecond(); + return now < timestamp ? 0 : now - timestamp; + } + }; + + NdbMutex* theInputMutex; + NdbCondition* theInputCond; + int theInputTimeout; + bool theInputFull; // the predicate + ArbitSignal theInputBuffer; // shared buffer + + void sendSignalToThread(ArbitSignal& aSignal); + + enum State { // thread states + StateInit, + StateStarted, // thread started + StateChoose1, // received one valid REQ + StateChoose2, // received two valid REQs + StateFinished // finished one way or other + }; + State theState; + + enum Stop { // stop code in ArbitSignal.data.code + StopExit = 1, // at API exit + StopRequest = 2, // request from kernel + StopRestart = 3 // stop before restart + }; + + void threadStart(ArbitSignal& aSignal); // handle thread events + void threadChoose(ArbitSignal& aSignal); + void threadTimeout(); + void threadStop(ArbitSignal& aSignal); + + ArbitSignal theStartReq; + ArbitSignal theChooseReq1; + ArbitSignal theChooseReq2; + ArbitSignal theStopOrd; + + void sendStartConf(ArbitSignal& aSignal, Uint32); + void sendChooseRef(ArbitSignal& aSignal, Uint32); + void sendChooseConf(ArbitSignal& aSignal, Uint32); + void sendStopRep(ArbitSignal& aSignal, Uint32); + + void sendSignalToQmgr(ArbitSignal& aSignal); +}; + +#endif |