summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.ndb.mysql.com>2005-07-15 13:30:01 +0200
committerunknown <tomas@poseidon.ndb.mysql.com>2005-07-15 13:30:01 +0200
commit701d93a420cb9289e4c2a3b1f5715c1fa64d07ae (patch)
tree7e4323e7c54a99a90d62e7411c4332b08fcacd1d
parentec27a6d3e8d289ab7afad18c5da8d8ce317c840b (diff)
parent61af1b50e9fd03e88940e31bcee5a5c9c286fefc (diff)
downloadmariadb-git-701d93a420cb9289e4c2a3b1f5715c1fa64d07ae.tar.gz
Merge mysqldev@production.mysql.com:my/mysql-4.1-releasemysql-4.1.13
into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1-release
-rw-r--r--ndb/include/transporter/TransporterRegistry.hpp2
-rw-r--r--ndb/src/common/mgmcommon/ConfigRetriever.cpp10
-rw-r--r--ndb/src/common/transporter/Transporter.cpp15
-rw-r--r--ndb/src/common/transporter/Transporter.hpp1
-rw-r--r--ndb/src/common/transporter/TransporterRegistry.cpp6
-rw-r--r--ndb/src/mgmclient/CommandInterpreter.cpp34
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.cpp18
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.hpp3
-rw-r--r--ndb/src/mgmsrv/Services.cpp93
9 files changed, 152 insertions, 30 deletions
diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp
index 8bd1de39ed5..410f3e1dc12 100644
--- a/ndb/include/transporter/TransporterRegistry.hpp
+++ b/ndb/include/transporter/TransporterRegistry.hpp
@@ -238,6 +238,8 @@ public:
};
Vector<Transporter_interface> m_transporter_interface;
void add_transporter_interface(const char *interf, unsigned short port);
+
+ struct in_addr get_connect_address(NodeId node_id) const;
protected:
private:
diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
index 648f3b4a52c..b73bcea1bcc 100644
--- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp
+++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
@@ -131,16 +131,14 @@ ConfigRetriever::getConfig() {
}
ndb_mgm_configuration *
-ConfigRetriever::getConfig(NdbMgmHandle m_handle){
-
+ConfigRetriever::getConfig(NdbMgmHandle m_handle)
+{
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle,m_version);
- if(conf == 0){
+ if(conf == 0)
+ {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return 0;
}
-
- ndb_mgm_disconnect(m_handle);
-
return conf;
}
diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp
index b84f8f6fb5e..328ce2816de 100644
--- a/ndb/src/common/transporter/Transporter.cpp
+++ b/ndb/src/common/transporter/Transporter.cpp
@@ -74,6 +74,7 @@ Transporter::Transporter(TransporterRegistry &t_reg,
m_connected = false;
m_timeOutMillis = 1000;
+ m_connect_address.s_addr= 0;
if (isServer)
m_socket_client= 0;
else
@@ -98,6 +99,13 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
DBUG_RETURN(true); // TODO assert(0);
}
+ {
+ struct sockaddr addr;
+ SOCKET_SIZE_TYPE addrlen= sizeof(addr);
+ int r= getpeername(sockfd, &addr, &addrlen);
+ m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
+ }
+
bool res = connect_server_impl(sockfd);
if(res){
m_connected = true;
@@ -164,6 +172,13 @@ Transporter::connect_client() {
g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
}
+ {
+ struct sockaddr addr;
+ SOCKET_SIZE_TYPE addrlen= sizeof(addr);
+ int r= getpeername(sockfd, &addr, &addrlen);
+ m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
+ }
+
bool res = connect_client_impl(sockfd);
if(res){
m_connected = true;
diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp
index c3b0d144eaf..5f3f8063723 100644
--- a/ndb/src/common/transporter/Transporter.hpp
+++ b/ndb/src/common/transporter/Transporter.hpp
@@ -122,6 +122,7 @@ protected:
private:
SocketClient *m_socket_client;
+ struct in_addr m_connect_address;
protected:
Uint32 getErrorCount();
diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp
index ac6161b314e..0efad6d1a1d 100644
--- a/ndb/src/common/transporter/TransporterRegistry.cpp
+++ b/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -51,6 +51,12 @@ extern int g_ndb_shm_signum;
#include <EventLogger.hpp>
extern EventLogger g_eventLogger;
+struct in_addr
+TransporterRegistry::get_connect_address(NodeId node_id) const
+{
+ return theTransporters[node_id]->m_connect_address;
+}
+
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{
DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp
index 34fe57d1fca..e1619917de5 100644
--- a/ndb/src/mgmclient/CommandInterpreter.cpp
+++ b/ndb/src/mgmclient/CommandInterpreter.cpp
@@ -455,11 +455,13 @@ static int do_event_thread;
static void*
event_thread_run(void* m)
{
+ DBUG_ENTER("event_thread_run");
+
NdbMgmHandle handle= *(NdbMgmHandle*)m;
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
int fd = ndb_mgm_listen_event(handle, filter);
- if (fd > 0)
+ if (fd != NDB_INVALID_SOCKET)
{
do_event_thread= 1;
char *tmp= 0;
@@ -468,20 +470,26 @@ event_thread_run(void* m)
do {
if (tmp == 0) NdbSleep_MilliSleep(10);
if((tmp = in.gets(buf, 1024)))
- ndbout << tmp;
+ {
+ const char ping_token[]= "<PING>";
+ if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
+ ndbout << tmp;
+ }
} while(do_event_thread);
+ NDB_CLOSE_SOCKET(fd);
}
else
{
do_event_thread= -1;
}
- return NULL;
+ DBUG_RETURN(NULL);
}
bool
CommandInterpreter::connect()
{
+ DBUG_ENTER("CommandInterpreter::connect");
if(!m_connected)
{
if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
@@ -512,8 +520,19 @@ CommandInterpreter::connect()
do_event_thread == 0 ||
do_event_thread == -1)
{
- printf("Warning, event thread startup failed, degraded printouts as result\n");
+ DBUG_PRINT("info",("Warning, event thread startup failed, "
+ "degraded printouts as result, errno=%d",
+ errno));
+ printf("Warning, event thread startup failed, "
+ "degraded printouts as result, errno=%d\n", errno);
do_event_thread= 0;
+ if (m_event_thread)
+ {
+ void *res;
+ NdbThread_WaitFor(m_event_thread, &res);
+ NdbThread_Destroy(&m_event_thread);
+ }
+ ndb_mgm_disconnect(m_mgmsrv2);
}
}
else
@@ -521,6 +540,8 @@ CommandInterpreter::connect()
printf("Warning, event connect failed, degraded printouts as result\n");
}
m_connected= true;
+ DBUG_PRINT("info",("Connected to Management Server at: %s:%d",
+ host,port));
if (m_verbose)
{
printf("Connected to Management Server at: %s:%d\n",
@@ -528,12 +549,13 @@ CommandInterpreter::connect()
}
}
}
- return m_connected;
+ DBUG_RETURN(m_connected);
}
bool
CommandInterpreter::disconnect()
{
+ DBUG_ENTER("CommandInterpreter::disconnect");
if (m_event_thread) {
void *res;
do_event_thread= 0;
@@ -550,7 +572,7 @@ CommandInterpreter::disconnect()
}
m_connected= false;
}
- return true;
+ DBUG_RETURN(true);
}
//*****************************************************************************
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index ceaedc9955b..f17d2a41be1 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -2124,6 +2124,24 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
return nodeTypes[nodeId];
}
+const char *MgmtSrvr::get_connect_address(Uint32 node_id)
+{
+ if (m_connect_address[node_id].s_addr == 0 &&
+ theFacade && theFacade->theTransporterRegistry &&
+ theFacade->theClusterMgr &&
+ getNodeType(node_id) == NDB_MGM_NODE_TYPE_NDB)
+ {
+ const ClusterMgr::Node &node=
+ theFacade->theClusterMgr->getNodeInfo(node_id);
+ if (node.connected)
+ {
+ m_connect_address[node_id]=
+ theFacade->theTransporterRegistry->get_connect_address(node_id);
+ }
+ }
+ return inet_ntoa(m_connect_address[node_id]);
+}
+
void
MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const
{
diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp
index ce78983b3c3..2b87bb9416a 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.hpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.hpp
@@ -60,6 +60,7 @@ public:
}
void add_listener(const Event_listener&);
+ void check_listeners();
void update_max_log_level(const LogLevel&);
void update_log_level(const LogLevel&);
@@ -508,7 +509,7 @@ public:
int setDbParameter(int node, int parameter, const char * value, BaseString&);
- const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
+ const char *get_connect_address(Uint32 node_id);
void get_connected_nodes(NodeBitmask &connected_nodes) const;
SocketServer *get_socket_server() { return m_socket_server; }
diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp
index 8ba8c2fe87e..00cf6390c73 100644
--- a/ndb/src/mgmsrv/Services.cpp
+++ b/ndb/src/mgmsrv/Services.cpp
@@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = {
};
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
- : SocketServer::Session(sock), m_mgmsrv(mgm) {
+ : SocketServer::Session(sock), m_mgmsrv(mgm)
+{
+ DBUG_ENTER("MgmApiSession::MgmApiSession");
m_input = new SocketInputStream(sock);
m_output = new SocketOutputStream(sock);
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
+ DBUG_VOID_RETURN;
}
MgmApiSession::~MgmApiSession()
{
+ DBUG_ENTER("MgmApiSession::~MgmApiSession");
if (m_input)
delete m_input;
if (m_output)
@@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession()
delete m_parser;
if (m_allocated_resources)
delete m_allocated_resources;
+ if(m_socket != NDB_INVALID_SOCKET)
+ {
+ NDB_CLOSE_SOCKET(m_socket);
+ m_socket= NDB_INVALID_SOCKET;
+ }
+ DBUG_VOID_RETURN;
}
void
-MgmApiSession::runSession() {
+MgmApiSession::runSession()
+{
+ DBUG_ENTER("MgmApiSession::runSession");
+
Parser_t::Context ctx;
while(!m_stop) {
m_parser->run(ctx, *this);
@@ -301,8 +314,13 @@ MgmApiSession::runSession() {
break;
}
}
- if(m_socket >= 0)
+ if(m_socket != NDB_INVALID_SOCKET)
+ {
NDB_CLOSE_SOCKET(m_socket);
+ m_socket= NDB_INVALID_SOCKET;
+ }
+
+ DBUG_VOID_RETURN;
}
#ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT
@@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Uint32 threshold;
LogLevel::EventCategory cat;
Logger::LoggerLevel severity;
- int i;
+ int i, n;
DBUG_ENTER("Ndb_mgmd_event_service::log");
DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId));
@@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Vector<NDB_SOCKET_TYPE> copy;
m_clients.lock();
- for(i = m_clients.size() - 1; i >= 0; i--){
- if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){
- if(m_clients[i].m_socket != NDB_INVALID_SOCKET &&
- println_socket(m_clients[i].m_socket,
- MAX_WRITE_TIMEOUT, m_text) == -1){
- copy.push_back(m_clients[i].m_socket);
+ for(i = m_clients.size() - 1; i >= 0; i--)
+ {
+ if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat))
+ {
+ int fd= m_clients[i].m_socket;
+ if(fd != NDB_INVALID_SOCKET &&
+ println_socket(fd, MAX_WRITE_TIMEOUT, m_text) == -1)
+ {
+ copy.push_back(fd);
m_clients.erase(i, false);
}
}
}
m_clients.unlock();
- for(i = 0; (unsigned)i < copy.size(); i++){
- NDB_CLOSE_SOCKET(copy[i]);
- }
+ if ((n= (int)copy.size()))
+ {
+ for(i= 0; i < n; i++)
+ NDB_CLOSE_SOCKET(copy[i]);
- if(copy.size()){
LogLevel tmp; tmp.clear();
m_clients.lock();
- for(i = 0; (unsigned)i < m_clients.size(); i++){
+ for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel);
- }
m_clients.unlock();
update_log_level(tmp);
}
@@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp)
}
void
-Ndb_mgmd_event_service::add_listener(const Event_listener& client){
+Ndb_mgmd_event_service::check_listeners()
+{
+ int i, n= 0;
+ DBUG_ENTER("Ndb_mgmd_event_service::check_listeners");
+ m_clients.lock();
+ for(i= m_clients.size() - 1; i >= 0; i--)
+ {
+ int fd= m_clients[i].m_socket;
+ DBUG_PRINT("info",("%d %d",i,fd));
+ char buf[1];
+ buf[0]=0;
+ if (fd != NDB_INVALID_SOCKET &&
+ println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1)
+ {
+ NDB_CLOSE_SOCKET(fd);
+ m_clients.erase(i, false);
+ n=1;
+ }
+ }
+ if (n)
+ {
+ LogLevel tmp; tmp.clear();
+ for(i= m_clients.size() - 1; i >= 0; i--)
+ tmp.set_max(m_clients[i].m_logLevel);
+ update_log_level(tmp);
+ }
+ m_clients.unlock();
+ DBUG_VOID_RETURN;
+}
+
+void
+Ndb_mgmd_event_service::add_listener(const Event_listener& client)
+{
+ DBUG_ENTER("Ndb_mgmd_event_service::add_listener");
+ DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket));
+
+ check_listeners();
+
m_clients.push_back(client);
update_max_log_level(client.m_logLevel);
+
+ DBUG_VOID_RETURN;
}
void