diff options
author | unknown <tomas@poseidon.ndb.mysql.com> | 2004-11-09 19:39:29 +0000 |
---|---|---|
committer | unknown <tomas@poseidon.ndb.mysql.com> | 2004-11-09 19:39:29 +0000 |
commit | 4c26186b4723f8cb9c44b1c9b0314d54d532f826 (patch) | |
tree | edc7c262c8c446aa592557dc15ccb01c2a6d7af5 /ndb | |
parent | 8e8c1a4756f04b582893965af0b780c2d32536f9 (diff) | |
download | mariadb-git-4c26186b4723f8cb9c44b1c9b0314d54d532f826.tar.gz |
added management function to purge stale sessions in the management server
ndb/include/util/Bitmask.hpp:
added bitXORC
ndb/include/util/SocketServer.hpp:
added method to apply function on each session
ndb/src/common/util/SocketServer.cpp:
added method to apply function on each session
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/include/mgmapi/mgmapi.h | 1 | ||||
-rw-r--r-- | ndb/include/util/Bitmask.hpp | 34 | ||||
-rw-r--r-- | ndb/include/util/SocketServer.hpp | 4 | ||||
-rw-r--r-- | ndb/src/common/util/SocketServer.cpp | 15 | ||||
-rw-r--r-- | ndb/src/mgmapi/mgmapi.cpp | 42 | ||||
-rw-r--r-- | ndb/src/mgmclient/CommandInterpreter.cpp | 46 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 88 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.hpp | 11 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.cpp | 42 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.hpp | 6 | ||||
-rw-r--r-- | ndb/src/mgmsrv/main.cpp | 3 |
11 files changed, 274 insertions, 18 deletions
diff --git a/ndb/include/mgmapi/mgmapi.h b/ndb/include/mgmapi/mgmapi.h index 6dcf58b44e2..f1ef357421b 100644 --- a/ndb/include/mgmapi/mgmapi.h +++ b/ndb/include/mgmapi/mgmapi.h @@ -733,6 +733,7 @@ extern "C" { int param, unsigned long long * value); int ndb_mgm_get_string_parameter(const ndb_mgm_configuration_iterator*, int param, const char ** value); + int ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **); #ifdef __cplusplus } #endif diff --git a/ndb/include/util/Bitmask.hpp b/ndb/include/util/Bitmask.hpp index bb217adab5f..19aa604e4a1 100644 --- a/ndb/include/util/Bitmask.hpp +++ b/ndb/include/util/Bitmask.hpp @@ -105,6 +105,11 @@ public: static void bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]); /** + * bitXORC - Bitwise (x ^ ~y) into first operand. + */ + static void bitXORC(unsigned size, Uint32 data[], const Uint32 data2[]); + + /** * contains - Check if all bits set in data2 are set in data */ static bool contains(unsigned size, Uint32 data[], const Uint32 data2[]); @@ -261,6 +266,14 @@ BitmaskImpl::bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]) } } +inline void +BitmaskImpl::bitXORC(unsigned size, Uint32 data[], const Uint32 data2[]) +{ + for (unsigned i = 0; i < size; i++) { + data[i] ^= ~data2[i]; + } +} + inline bool BitmaskImpl::contains(unsigned size, Uint32 data[], const Uint32 data2[]) { @@ -452,6 +465,12 @@ public: BitmaskPOD<size>& bitXOR(const BitmaskPOD<size>& mask2); /** + * bitXORC - Bitwise (x ^ ~y) into first operand. + */ + static void bitXORC(Uint32 data[], const Uint32 data2[]); + BitmaskPOD<size>& bitXORC(const BitmaskPOD<size>& mask2); + + /** * contains - Check if all bits set in data2 (that) are also set in data (this) */ static bool contains(Uint32 data[], const Uint32 data2[]); @@ -713,6 +732,21 @@ BitmaskPOD<size>::bitXOR(const BitmaskPOD<size>& mask2) } template <unsigned size> +inline void +BitmaskPOD<size>::bitXORC(Uint32 data[], const Uint32 data2[]) +{ + BitmaskImpl::bitXORC(size,data, data2); +} + +template <unsigned size> +inline BitmaskPOD<size>& +BitmaskPOD<size>::bitXORC(const BitmaskPOD<size>& mask2) +{ + BitmaskPOD<size>::bitXORC(rep.data, mask2.rep.data); + return *this; +} + +template <unsigned size> char * BitmaskPOD<size>::getText(const Uint32 data[], char* buf) { diff --git a/ndb/include/util/SocketServer.hpp b/ndb/include/util/SocketServer.hpp index 3860b9ca84b..2fad991e5f8 100644 --- a/ndb/include/util/SocketServer.hpp +++ b/ndb/include/util/SocketServer.hpp @@ -37,7 +37,7 @@ public: public: virtual ~Session() {} virtual void runSession(){} - virtual void stopSession(){} + virtual void stopSession(){ m_stop = true; } protected: friend class SocketServer; friend void* sessionThread_C(void*); @@ -98,6 +98,8 @@ public: */ void stopSessions(bool wait = false); + void foreachSession(void (*f)(SocketServer::Session*, void*), void *data); + private: struct SessionInstance { Service * m_service; diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp index c3cffa1399b..8bee256684d 100644 --- a/ndb/src/common/util/SocketServer.cpp +++ b/ndb/src/common/util/SocketServer.cpp @@ -259,6 +259,15 @@ transfer(NDB_SOCKET_TYPE sock){ } void +SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) +{ + for(int i = m_sessions.size() - 1; i >= 0; i--){ + (*func)(m_sessions[i].m_session, data); + } + checkSessions(); +} + +void SocketServer::checkSessions(){ for(int i = m_sessions.size() - 1; i >= 0; i--){ if(m_sessions[i].m_session->m_stopped){ @@ -278,8 +287,10 @@ void SocketServer::stopSessions(bool wait){ int i; for(i = m_sessions.size() - 1; i>=0; i--) - m_sessions[i].m_session->m_stop = true; - + { + m_sessions[i].m_session->stopSession(); + m_sessions[i].m_session->m_stop = true; // to make sure + } for(i = m_services.size() - 1; i>=0; i--) m_services[i].m_service->stopSessions(); diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 4b62df968b3..66f0dbb1842 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -1834,4 +1834,46 @@ ndb_mgm_set_string_parameter(NdbMgmHandle handle, return res; } +extern "C" +int +ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **purged){ + CHECK_HANDLE(handle, 0); + CHECK_CONNECTED(handle, 0); + + Properties args; + + const ParserRow<ParserDummy> reply[]= { + MGM_CMD("purge stale sessions reply", NULL, ""), + MGM_ARG("purged", String, Optional, ""), + MGM_ARG("result", String, Mandatory, "Error message"), + MGM_END() + }; + + const Properties *prop; + prop= ndb_mgm_call(handle, reply, "purge stale sessions", &args); + + if(prop == NULL) { + SET_ERROR(handle, EIO, "Unable to purge stale sessions"); + return -1; + } + + int res= -1; + do { + const char * buf; + if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){ + ndbout_c("ERROR Message: %s\n", buf); + break; + } + if (purged) { + if (prop->get("purged", &buf)) + *purged= strdup(buf); + else + *purged= 0; + } + res= 0; + } while(0); + delete prop; + return res; +} + template class Vector<const ParserRow<ParserDummy>*>; diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index e802ffff5ce..523c271ed9e 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -106,6 +106,7 @@ private: */ void executeHelp(char* parameters); void executeShow(char* parameters); + void executePurge(char* parameters); void executeShutdown(char* parameters); void executeRun(char* parameters); void executeInfo(char* parameters); @@ -264,6 +265,7 @@ static const char* helpText = #ifdef HAVE_GLOBAL_REPLICATION "REP CONNECT <host:port> Connect to REP server on host:port\n" #endif +"PURGE STALE SESSIONS Reset reserved nodeid's in the mgmt server\n" "QUIT Quit management client\n" ; @@ -541,6 +543,10 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect) executeAbortBackup(allAfterFirstToken); return true; } + else if (strcmp(firstToken, "PURGE") == 0) { + executePurge(allAfterFirstToken); + return true; + } #ifdef HAVE_GLOBAL_REPLICATION else if(strcmp(firstToken, "REPLICATION") == 0 || strcmp(firstToken, "REP") == 0) { @@ -983,6 +989,46 @@ print_nodes(ndb_mgm_cluster_state *state, ndb_mgm_configuration_iterator *it, } void +CommandInterpreter::executePurge(char* parameters) +{ + int command_ok= 0; + do { + if (emptyString(parameters)) + break; + char* firstToken = strtok(parameters, " "); + char* nextToken = strtok(NULL, " \0"); + if (strcmp(firstToken,"STALE") == 0 && + nextToken && + strcmp(nextToken, "SESSIONS") == 0) { + command_ok= 1; + break; + } + } while(0); + + if (!command_ok) { + ndbout_c("Unexpected command, expected: PURGE STALE SESSIONS"); + return; + } + + int i; + char *str; + connect(); + + if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) { + ndbout_c("Command failed"); + return; + } + if (str) { + ndbout_c("Purged sessions with node id's: %s", str); + free(str); + } + else + { + ndbout_c("No sessions purged"); + } +} + +void CommandInterpreter::executeShow(char* parameters) { int i; diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 2e30d73290b..a49b29af275 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -400,11 +400,13 @@ MgmtSrvr::getPort() const { /* Constructor */ MgmtSrvr::MgmtSrvr(NodeId nodeId, + SocketServer *socket_server, const BaseString &configFilename, LocalConfig &local_config, Config * config): _blockNumber(1), // Hard coded block number since it makes it easy to send // signals to other management servers. + m_socket_server(socket_server), _ownReference(0), m_local_config(local_config), theSignalIdleList(NULL), @@ -2094,6 +2096,25 @@ MgmtSrvr::getNodeType(NodeId nodeId) const return nodeTypes[nodeId]; } +void +MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const +{ + if (theFacade && theFacade->theClusterMgr) + { + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) + { + const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); + if (node.connected) + { + connected_nodes.bitOR(node.m_state.m_connected_nodes); + } + } + } + } +} + bool MgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, @@ -2106,7 +2127,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, *nodeId, type, client_addr)); if (g_no_nodeid_checks) { if (*nodeId == 0) { - error_string.appfmt("no-nodeid-ckecks set in manegment server.\n" + error_string.appfmt("no-nodeid-checks set in management server.\n" "node id must be set explicitly in connectstring"); DBUG_RETURN(false); } @@ -2115,16 +2136,11 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, Guard g(m_node_id_mutex); int no_mgm= 0; NodeBitmask connected_nodes(m_reserved_nodes); - for(Uint32 i = 0; i < MAX_NODES; i++) + get_connected_nodes(connected_nodes); { - if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB && - theFacade && theFacade->theClusterMgr) { - const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); - if (node.connected) { - connected_nodes.bitOR(node.m_state.m_connected_nodes); - } - } else if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM) - no_mgm++; + for(Uint32 i = 0; i < MAX_NODES; i++) + if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM) + no_mgm++; } bool found_matching_id= false; bool found_matching_type= false; @@ -2227,6 +2243,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, m_connect_address[id_found].s_addr= 0; } m_reserved_nodes.set(id_found); + char tmp_str[128]; + m_reserved_nodes.getText(tmp_str); + g_EventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.", + id_found, get_connect_address(id_found), tmp_str); DBUG_RETURN(true); } @@ -2283,6 +2303,36 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, error_string.appfmt("No node defined with id=%d in config file.", *nodeId); } + + g_EventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. " + "Returned error string \"%s\"", + *nodeId, + client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>", + error_string.c_str()); + + NodeBitmask connected_nodes2; + get_connected_nodes(connected_nodes2); + { + BaseString tmp_connected, tmp_not_connected; + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (connected_nodes2.get(i)) + { + if (!m_reserved_nodes.get(i)) + tmp_connected.appfmt(" %d", i); + } + else if (m_reserved_nodes.get(i)) + { + tmp_not_connected.appfmt(" %d", i); + } + } + if (tmp_connected.length() > 0) + g_EventLogger.info("Mgmt server state: node id's %s connected but not reserved", + tmp_connected.c_str()); + if (tmp_not_connected.length() > 0) + g_EventLogger.info("Mgmt server state: node id's %s not connected but reserved", + tmp_not_connected.c_str()); + } DBUG_RETURN(false); } @@ -2531,10 +2581,15 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() { Guard g(m_mgmsrv.m_node_id_mutex); if (!m_reserved_nodes.isclear()) { + m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); // node has been reserved, force update signal to ndb nodes global_flag_send_heartbeat_now= 1; + + char tmp_str[128]; + m_mgmsrv.m_reserved_nodes.getText(tmp_str); + g_EventLogger.info("Mgmt server state: nodeid %d freed, m_reserved_nodes %s.", + get_nodeid(), tmp_str); } - m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); } void @@ -2543,6 +2598,17 @@ MgmtSrvr::Allocated_resources::reserve_node(NodeId id) m_reserved_nodes.set(id); } +NodeId +MgmtSrvr::Allocated_resources::get_nodeid() const +{ + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (m_reserved_nodes.get(i)) + return i; + } + return 0; +} + int MgmtSrvr::setDbParameter(int node, int param, const char * value, BaseString& msg){ diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index c796e1e9219..b3257491123 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -96,7 +96,10 @@ public: // methods to reserve/allocate resources which // will be freed when running destructor void reserve_node(NodeId id); - bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId);} + bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } + bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } + bool isclear() { return m_reserved_nodes.isclear(); } + NodeId get_nodeid() const; private: MgmtSrvr &m_mgmsrv; NodeBitmask m_reserved_nodes; @@ -173,6 +176,7 @@ public: /* Constructor */ MgmtSrvr(NodeId nodeId, /* Local nodeid */ + SocketServer *socket_server, const BaseString &config_filename, /* Where to save config */ LocalConfig &local_config, /* Ndb.cfg filename */ Config * config); @@ -499,6 +503,9 @@ public: int setDbParameter(int node, int parameter, const char * value, BaseString&); const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); } + void get_connected_nodes(NodeBitmask &connected_nodes) const; + SocketServer *get_socket_server() { return m_socket_server; } + //************************************************************************** private: //************************************************************************** @@ -525,6 +532,8 @@ private: int _blockNumber; NodeId _ownNodeId; + SocketServer *m_socket_server; + BlockReference _ownReference; NdbMutex *m_configMutex; const Config * _config; diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 2672d8c9d4b..0394c4e80bb 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -242,6 +242,8 @@ ParserRow<MgmApiSession> commands[] = { MGM_ARG("node", Int, Optional, "Node"), MGM_ARG("filter", String, Mandatory, "Event category"), + MGM_CMD("purge stale sessions", &MgmApiSession::purge_stale_sessions, ""), + MGM_END() }; @@ -1412,6 +1414,46 @@ done: m_output->println(""); } +struct PurgeStruct +{ + NodeBitmask free_nodes;/* free nodes as reported + * by ndbd in apiRegReqConf + */ + BaseString *str; +}; + +void +MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) +{ + MgmApiSession *s= (MgmApiSession *)_s; + struct PurgeStruct &ps= *(struct PurgeStruct *)data; + if (s->m_allocated_resources->is_reserved(ps.free_nodes)) + { + ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); + s->stopSession(); + } +} + +void +MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, + const class Properties &args) +{ + struct PurgeStruct ps; + BaseString str; + ps.str = &str; + + m_mgmsrv.get_connected_nodes(ps.free_nodes); + ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes + + m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); + + m_output->println("purge stale sessions reply"); + if (str.length() > 0) + m_output->println("purged:%s",str.c_str()); + m_output->println("result: Ok"); + m_output->println(""); +} + template class MutexVector<int>; template class Vector<ParserRow<MgmApiSession> const*>; template class Vector<unsigned short>; diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp index e47820826b6..bfc915f18f1 100644 --- a/ndb/src/mgmsrv/Services.hpp +++ b/ndb/src/mgmsrv/Services.hpp @@ -28,7 +28,9 @@ /** Undefine this to remove backwards compatibility for "GET CONFIG". */ #define MGM_GET_CONFIG_BACKWARDS_COMPAT -class MgmApiSession : public SocketServer::Session { +class MgmApiSession : public SocketServer::Session +{ + static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); private: typedef Parser<MgmApiSession> Parser_t; @@ -84,6 +86,8 @@ public: void setParameter(Parser_t::Context &ctx, const class Properties &args); void listen_event(Parser_t::Context &ctx, const class Properties &args); + + void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args); void repCommand(Parser_t::Context &ctx, const class Properties &args); }; diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp index 15767e4766d..7a57fdeb77a 100644 --- a/ndb/src/mgmsrv/main.cpp +++ b/ndb/src/mgmsrv/main.cpp @@ -83,7 +83,6 @@ struct MgmGlobals { int g_no_nodeid_checks= 0; static MgmGlobals glob; - /****************************************************************************** * Function prototypes ******************************************************************************/ @@ -226,7 +225,7 @@ int main(int argc, char** argv) if (!readGlobalConfig()) goto error_end; - glob.mgmObject = new MgmtSrvr(glob.localNodeId, + glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer, BaseString(glob.config_filename), local_config, glob.cluster_config); |