diff options
author | tomas@poseidon.ndb.mysql.com <> | 2005-07-15 13:30:01 +0200 |
---|---|---|
committer | tomas@poseidon.ndb.mysql.com <> | 2005-07-15 13:30:01 +0200 |
commit | 52d40b3f606670289c8fa158480c18604aca3035 (patch) | |
tree | 7e4323e7c54a99a90d62e7411c4332b08fcacd1d /ndb | |
parent | 0487cb938e17a2334fb2890b891d1d8325229104 (diff) | |
parent | ca75369663b2b79624b530b25aa519162caffdeb (diff) | |
download | mariadb-git-52d40b3f606670289c8fa158480c18604aca3035.tar.gz |
Merge mysqldev@production.mysql.com:my/mysql-4.1-release
into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1-release
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/include/transporter/TransporterRegistry.hpp | 2 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/ConfigRetriever.cpp | 10 | ||||
-rw-r--r-- | ndb/src/common/transporter/Transporter.cpp | 15 | ||||
-rw-r--r-- | ndb/src/common/transporter/Transporter.hpp | 1 | ||||
-rw-r--r-- | ndb/src/common/transporter/TransporterRegistry.cpp | 6 | ||||
-rw-r--r-- | ndb/src/mgmclient/CommandInterpreter.cpp | 34 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 18 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.hpp | 3 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.cpp | 93 |
9 files changed, 152 insertions, 30 deletions
diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp index 8bd1de39ed5..410f3e1dc12 100644 --- a/ndb/include/transporter/TransporterRegistry.hpp +++ b/ndb/include/transporter/TransporterRegistry.hpp @@ -238,6 +238,8 @@ public: }; Vector<Transporter_interface> m_transporter_interface; void add_transporter_interface(const char *interf, unsigned short port); + + struct in_addr get_connect_address(NodeId node_id) const; protected: private: diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index 648f3b4a52c..b73bcea1bcc 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -131,16 +131,14 @@ ConfigRetriever::getConfig() { } ndb_mgm_configuration * -ConfigRetriever::getConfig(NdbMgmHandle m_handle){ - +ConfigRetriever::getConfig(NdbMgmHandle m_handle) +{ ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle,m_version); - if(conf == 0){ + if(conf == 0) + { setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); return 0; } - - ndb_mgm_disconnect(m_handle); - return conf; } diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp index b84f8f6fb5e..328ce2816de 100644 --- a/ndb/src/common/transporter/Transporter.cpp +++ b/ndb/src/common/transporter/Transporter.cpp @@ -74,6 +74,7 @@ Transporter::Transporter(TransporterRegistry &t_reg, m_connected = false; m_timeOutMillis = 1000; + m_connect_address.s_addr= 0; if (isServer) m_socket_client= 0; else @@ -98,6 +99,13 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { DBUG_RETURN(true); // TODO assert(0); } + { + struct sockaddr addr; + SOCKET_SIZE_TYPE addrlen= sizeof(addr); + int r= getpeername(sockfd, &addr, &addrlen); + m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr; + } + bool res = connect_server_impl(sockfd); if(res){ m_connected = true; @@ -164,6 +172,13 @@ Transporter::connect_client() { g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId); } + { + struct sockaddr addr; + SOCKET_SIZE_TYPE addrlen= sizeof(addr); + int r= getpeername(sockfd, &addr, &addrlen); + m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr; + } + bool res = connect_client_impl(sockfd); if(res){ m_connected = true; diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp index c3b0d144eaf..5f3f8063723 100644 --- a/ndb/src/common/transporter/Transporter.hpp +++ b/ndb/src/common/transporter/Transporter.hpp @@ -122,6 +122,7 @@ protected: private: SocketClient *m_socket_client; + struct in_addr m_connect_address; protected: Uint32 getErrorCount(); diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index ac6161b314e..0efad6d1a1d 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -51,6 +51,12 @@ extern int g_ndb_shm_signum; #include <EventLogger.hpp> extern EventLogger g_eventLogger; +struct in_addr +TransporterRegistry::get_connect_address(NodeId node_id) const +{ + return theTransporters[node_id]->m_connect_address; +} + SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) { DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index 34fe57d1fca..e1619917de5 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -455,11 +455,13 @@ static int do_event_thread; static void* event_thread_run(void* m) { + DBUG_ENTER("event_thread_run"); + NdbMgmHandle handle= *(NdbMgmHandle*)m; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 }; int fd = ndb_mgm_listen_event(handle, filter); - if (fd > 0) + if (fd != NDB_INVALID_SOCKET) { do_event_thread= 1; char *tmp= 0; @@ -468,20 +470,26 @@ event_thread_run(void* m) do { if (tmp == 0) NdbSleep_MilliSleep(10); if((tmp = in.gets(buf, 1024))) - ndbout << tmp; + { + const char ping_token[]= "<PING>"; + if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) + ndbout << tmp; + } } while(do_event_thread); + NDB_CLOSE_SOCKET(fd); } else { do_event_thread= -1; } - return NULL; + DBUG_RETURN(NULL); } bool CommandInterpreter::connect() { + DBUG_ENTER("CommandInterpreter::connect"); if(!m_connected) { if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) @@ -512,8 +520,19 @@ CommandInterpreter::connect() do_event_thread == 0 || do_event_thread == -1) { - printf("Warning, event thread startup failed, degraded printouts as result\n"); + DBUG_PRINT("info",("Warning, event thread startup failed, " + "degraded printouts as result, errno=%d", + errno)); + printf("Warning, event thread startup failed, " + "degraded printouts as result, errno=%d\n", errno); do_event_thread= 0; + if (m_event_thread) + { + void *res; + NdbThread_WaitFor(m_event_thread, &res); + NdbThread_Destroy(&m_event_thread); + } + ndb_mgm_disconnect(m_mgmsrv2); } } else @@ -521,6 +540,8 @@ CommandInterpreter::connect() printf("Warning, event connect failed, degraded printouts as result\n"); } m_connected= true; + DBUG_PRINT("info",("Connected to Management Server at: %s:%d", + host,port)); if (m_verbose) { printf("Connected to Management Server at: %s:%d\n", @@ -528,12 +549,13 @@ CommandInterpreter::connect() } } } - return m_connected; + DBUG_RETURN(m_connected); } bool CommandInterpreter::disconnect() { + DBUG_ENTER("CommandInterpreter::disconnect"); if (m_event_thread) { void *res; do_event_thread= 0; @@ -550,7 +572,7 @@ CommandInterpreter::disconnect() } m_connected= false; } - return true; + DBUG_RETURN(true); } //***************************************************************************** diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index ceaedc9955b..f17d2a41be1 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -2124,6 +2124,24 @@ MgmtSrvr::getNodeType(NodeId nodeId) const return nodeTypes[nodeId]; } +const char *MgmtSrvr::get_connect_address(Uint32 node_id) +{ + if (m_connect_address[node_id].s_addr == 0 && + theFacade && theFacade->theTransporterRegistry && + theFacade->theClusterMgr && + getNodeType(node_id) == NDB_MGM_NODE_TYPE_NDB) + { + const ClusterMgr::Node &node= + theFacade->theClusterMgr->getNodeInfo(node_id); + if (node.connected) + { + m_connect_address[node_id]= + theFacade->theTransporterRegistry->get_connect_address(node_id); + } + } + return inet_ntoa(m_connect_address[node_id]); +} + void MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const { diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index ce78983b3c3..2b87bb9416a 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -60,6 +60,7 @@ public: } void add_listener(const Event_listener&); + void check_listeners(); void update_max_log_level(const LogLevel&); void update_log_level(const LogLevel&); @@ -508,7 +509,7 @@ public: int setDbParameter(int node, int parameter, const char * value, BaseString&); - const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); } + const char *get_connect_address(Uint32 node_id); void get_connected_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 8ba8c2fe87e..00cf6390c73 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = { }; MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) - : SocketServer::Session(sock), m_mgmsrv(mgm) { + : SocketServer::Session(sock), m_mgmsrv(mgm) +{ + DBUG_ENTER("MgmApiSession::MgmApiSession"); m_input = new SocketInputStream(sock); m_output = new SocketOutputStream(sock); m_parser = new Parser_t(commands, *m_input, true, true, true); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); + DBUG_VOID_RETURN; } MgmApiSession::~MgmApiSession() { + DBUG_ENTER("MgmApiSession::~MgmApiSession"); if (m_input) delete m_input; if (m_output) @@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession() delete m_parser; if (m_allocated_resources) delete m_allocated_resources; + if(m_socket != NDB_INVALID_SOCKET) + { + NDB_CLOSE_SOCKET(m_socket); + m_socket= NDB_INVALID_SOCKET; + } + DBUG_VOID_RETURN; } void -MgmApiSession::runSession() { +MgmApiSession::runSession() +{ + DBUG_ENTER("MgmApiSession::runSession"); + Parser_t::Context ctx; while(!m_stop) { m_parser->run(ctx, *this); @@ -301,8 +314,13 @@ MgmApiSession::runSession() { break; } } - if(m_socket >= 0) + if(m_socket != NDB_INVALID_SOCKET) + { NDB_CLOSE_SOCKET(m_socket); + m_socket= NDB_INVALID_SOCKET; + } + + DBUG_VOID_RETURN; } #ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT @@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) Uint32 threshold; LogLevel::EventCategory cat; Logger::LoggerLevel severity; - int i; + int i, n; DBUG_ENTER("Ndb_mgmd_event_service::log"); DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId)); @@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) Vector<NDB_SOCKET_TYPE> copy; m_clients.lock(); - for(i = m_clients.size() - 1; i >= 0; i--){ - if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){ - if(m_clients[i].m_socket != NDB_INVALID_SOCKET && - println_socket(m_clients[i].m_socket, - MAX_WRITE_TIMEOUT, m_text) == -1){ - copy.push_back(m_clients[i].m_socket); + for(i = m_clients.size() - 1; i >= 0; i--) + { + if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)) + { + int fd= m_clients[i].m_socket; + if(fd != NDB_INVALID_SOCKET && + println_socket(fd, MAX_WRITE_TIMEOUT, m_text) == -1) + { + copy.push_back(fd); m_clients.erase(i, false); } } } m_clients.unlock(); - for(i = 0; (unsigned)i < copy.size(); i++){ - NDB_CLOSE_SOCKET(copy[i]); - } + if ((n= (int)copy.size())) + { + for(i= 0; i < n; i++) + NDB_CLOSE_SOCKET(copy[i]); - if(copy.size()){ LogLevel tmp; tmp.clear(); m_clients.lock(); - for(i = 0; (unsigned)i < m_clients.size(); i++){ + for(i= m_clients.size() - 1; i >= 0; i--) tmp.set_max(m_clients[i].m_logLevel); - } m_clients.unlock(); update_log_level(tmp); } @@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp) } void -Ndb_mgmd_event_service::add_listener(const Event_listener& client){ +Ndb_mgmd_event_service::check_listeners() +{ + int i, n= 0; + DBUG_ENTER("Ndb_mgmd_event_service::check_listeners"); + m_clients.lock(); + for(i= m_clients.size() - 1; i >= 0; i--) + { + int fd= m_clients[i].m_socket; + DBUG_PRINT("info",("%d %d",i,fd)); + char buf[1]; + buf[0]=0; + if (fd != NDB_INVALID_SOCKET && + println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1) + { + NDB_CLOSE_SOCKET(fd); + m_clients.erase(i, false); + n=1; + } + } + if (n) + { + LogLevel tmp; tmp.clear(); + for(i= m_clients.size() - 1; i >= 0; i--) + tmp.set_max(m_clients[i].m_logLevel); + update_log_level(tmp); + } + m_clients.unlock(); + DBUG_VOID_RETURN; +} + +void +Ndb_mgmd_event_service::add_listener(const Event_listener& client) +{ + DBUG_ENTER("Ndb_mgmd_event_service::add_listener"); + DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket)); + + check_listeners(); + m_clients.push_back(client); update_max_log_level(client.m_logLevel); + + DBUG_VOID_RETURN; } void |