diff options
author | unknown <stewart@willster.(none)> | 2006-08-08 11:59:21 +0800 |
---|---|---|
committer | unknown <stewart@willster.(none)> | 2006-08-08 11:59:21 +0800 |
commit | 507fffa5c7c79ed8e075f3a394774b750ed128a5 (patch) | |
tree | 48bb7da6f19c9a9f024ca2a23856e571ff06701e /ndb | |
parent | a7ba5b75a604ca7955fba507b5e457fb40a086a8 (diff) | |
parent | a610467f71781644f9cb0461765033e9ea3d3b66 (diff) | |
download | mariadb-git-507fffa5c7c79ed8e075f3a394774b750ed128a5.tar.gz |
Merge willster.(none):/home/stewart/Documents/MySQL/5.0/main
into willster.(none):/home/stewart/Documents/MySQL/5.0/bug13985
ndb/src/mgmsrv/MgmtSrvr.cpp:
Auto merged
ndb/src/mgmsrv/MgmtSrvr.hpp:
Auto merged
ndb/src/mgmsrv/Services.cpp:
Auto merged
ndb/src/mgmclient/CommandInterpreter.cpp:
manually merge parameter to pass print mutex to event thread
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/src/mgmclient/CommandInterpreter.cpp | 28 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 25 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.hpp | 3 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.cpp | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/ClusterMgr.cpp | 78 | ||||
-rw-r--r-- | ndb/src/ndbapi/ClusterMgr.hpp | 10 |
6 files changed, 137 insertions, 10 deletions
diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index 58b98671b14..103c252ca04 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -173,8 +173,15 @@ private: bool rep_connected; #endif struct NdbThread* m_event_thread; + NdbMutex *m_print_mutex; }; +struct event_thread_param { + NdbMgmHandle *m; + NdbMutex **p; +}; + +NdbMutex* print_mutex; /* * Facade object for CommandInterpreter @@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; + m_print_mutex= NdbMutex_Create(); #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; @@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); + NdbMutex_Destroy(m_print_mutex); } static bool @@ -444,11 +453,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* -event_thread_run(void* m) +event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); - NdbMgmHandle handle= *(NdbMgmHandle*)m; + struct event_thread_param param= *(struct event_thread_param*)p; + NdbMgmHandle handle= *(param.m); + NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, @@ -466,7 +477,11 @@ event_thread_run(void* m) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) - ndbout << tmp; + if(tmp && strlen(tmp)) + { + Guard g(printmutex); + ndbout << tmp; + } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); @@ -519,8 +534,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; + struct event_thread_param p; + p.m= &m_mgmsrv2; + p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, - (void**)&m_mgmsrv2, + (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); @@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; + return result; } @@ -920,6 +939,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { + Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 69c0286a1de..d514f0da1a4 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -1455,6 +1455,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include <ClusterMgr.hpp> +void +MgmtSrvr::updateStatus(NodeBitmask nodes) +{ + theFacade->theClusterMgr->forceHB(nodes); +} + int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, @@ -1979,6 +1985,25 @@ MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const } } +void +MgmtSrvr::get_connected_ndb_nodes(NodeBitmask &connected_nodes) const +{ + NodeBitmask ndb_nodes; + if (theFacade && theFacade->theClusterMgr) + { + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) + { + ndb_nodes.set(i); + const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); + connected_nodes.bitOR(node.m_state.m_connected_nodes); + } + } + } + connected_nodes.bitAND(ndb_nodes); +} + bool MgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index 187f225470a..ab71fe6f4dc 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -488,8 +488,11 @@ public: const char *get_connect_address(Uint32 node_id); void get_connected_nodes(NodeBitmask &connected_nodes) const; + void get_connected_ndb_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } + void updateStatus(NodeBitmask nodes); + //************************************************************************** private: //************************************************************************** diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 0524aba4c32..653f36ecc6d 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -982,6 +982,9 @@ printNodeStatus(OutputStream *output, MgmtSrvr &mgmsrv, enum ndb_mgm_node_type type) { NodeId nodeId = 0; + NodeBitmask hbnodes; + mgmsrv.get_connected_ndb_nodes(hbnodes); + mgmsrv.updateStatus(hbnodes); while(mgmsrv.getNextNodeId(&nodeId, type)) { enum ndb_mgm_node_status status; Uint32 startPhase = 0, diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index fbff57d3168..28f65eebde8 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -39,6 +39,8 @@ int global_flag_send_heartbeat_now= 0; +//#define DEBUG_REG + // Just a C wrapper for threadMain extern "C" void* @@ -67,6 +69,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); + waitForHBCond= NdbCondition_Create(); + waitingForHB= false; noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; @@ -77,7 +81,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ClusterMgr::~ClusterMgr() { DBUG_ENTER("ClusterMgr::~ClusterMgr"); - doStop(); + doStop(); + NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN; } @@ -164,6 +169,56 @@ ClusterMgr::doStop( ){ } void +ClusterMgr::forceHB(NodeBitmask waitFor) +{ + theFacade.lock_mutex(); + + if(waitingForHB) + { + NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + theFacade.unlock_mutex(); + return; + } + + global_flag_send_heartbeat_now= 1; + waitingForHB= true; + + waitForHBFromNodes= waitFor; +#ifdef DEBUG_REG + char buf[128]; + ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; +#endif + NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); + + signal.theVerId_signalNumber = GSN_API_REGREQ; + signal.theReceiversBlockNumber = QMGR; + signal.theTrace = 0; + signal.theLength = ApiRegReq::SignalLength; + + ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); + req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); + req->version = NDB_VERSION; + + int nodeId= 0; + for(int i=0; + NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i)); + i= nodeId+1) + { +#ifdef DEBUG_REG + ndbout << "FORCE HB to " << nodeId << endl; +#endif + theFacade.sendSignalUnCond(&signal, nodeId); + } + + NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + waitingForHB= false; +#ifdef DEBUG_REG + ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; +#endif + theFacade.unlock_mutex(); +} + +void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); @@ -226,7 +281,7 @@ ClusterMgr::threadMain( ){ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; } -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); @@ -278,7 +333,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif @@ -319,7 +374,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif @@ -351,6 +406,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } + + if(waitingForHB) + { + waitForHBFromNodes.clear(nodeId); + + if(waitForHBFromNodes.isclear()) + { + waitingForHB= false; + NdbCondition_Broadcast(waitForHBCond); + } + } } void @@ -379,6 +445,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){ default: break; } + + waitForHBFromNodes.clear(nodeId); + if(waitForHBFromNodes.isclear()) + NdbCondition_Signal(waitForHBCond); } void diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp index 1a1e622a889..b9863821b4f 100644 --- a/ndb/src/ndbapi/ClusterMgr.hpp +++ b/ndb/src/ndbapi/ClusterMgr.hpp @@ -49,7 +49,9 @@ public: void doStop(); void startThread(); - + + void forceHB(NodeBitmask waitFor); + private: void threadMain(); @@ -85,7 +87,11 @@ private: Uint32 noOfConnectedNodes; Node theNodes[MAX_NODES]; NdbThread* theClusterMgrThread; - + + NodeBitmask waitForHBFromNodes; // used in forcing HBs + NdbCondition* waitForHBCond; + bool waitingForHB; + /** * Used for controlling start/stop of the thread */ |