diff options
author | unknown <tomas@poseidon.bredbandsbolaget.se> | 2004-06-23 00:59:08 +0000 |
---|---|---|
committer | unknown <tomas@poseidon.bredbandsbolaget.se> | 2004-06-23 00:59:08 +0000 |
commit | f44c3ee911dcdd3d56b51d286d7497a2a71f342b (patch) | |
tree | cc02037b22a13e1ed0101c3b0af371530863efdc /ndb/src/common | |
parent | 190d9e72cbd7fa7b6b54c0fbc07e287b71967bb8 (diff) | |
parent | dc2544fdee9e510614cc7d674726ea388284e57f (diff) | |
download | mariadb-git-f44c3ee911dcdd3d56b51d286d7497a2a71f342b.tar.gz |
Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into poseidon.bredbandsbolaget.se:/home/tomas/mysql-4.1-ndb
ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp:
Auto merged
ndb/src/kernel/main.cpp:
Auto merged
ndb/src/mgmsrv/MgmtSrvr.cpp:
Auto merged
ndb/src/ndbapi/ClusterMgr.cpp:
Auto merged
ndb/src/ndbapi/TransporterFacade.cpp:
Auto merged
ndb/src/ndbapi/TransporterFacade.hpp:
Auto merged
Diffstat (limited to 'ndb/src/common')
-rw-r--r-- | ndb/src/common/mgmcommon/ConfigInfo.cpp | 43 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/ConfigRetriever.cpp | 24 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/LocalConfig.cpp | 6 | ||||
-rw-r--r-- | ndb/src/common/transporter/TCP_Transporter.cpp | 250 | ||||
-rw-r--r-- | ndb/src/common/transporter/TCP_Transporter.hpp | 83 | ||||
-rw-r--r-- | ndb/src/common/transporter/Transporter.cpp | 193 | ||||
-rw-r--r-- | ndb/src/common/transporter/Transporter.hpp | 109 | ||||
-rw-r--r-- | ndb/src/common/transporter/TransporterRegistry.cpp | 421 | ||||
-rw-r--r-- | ndb/src/common/util/Makefile.am | 3 | ||||
-rw-r--r-- | ndb/src/common/util/SocketAuthenticator.cpp | 63 | ||||
-rw-r--r-- | ndb/src/common/util/SocketClient.cpp | 90 | ||||
-rw-r--r-- | ndb/src/common/util/SocketServer.cpp | 2 |
12 files changed, 661 insertions, 626 deletions
diff --git a/ndb/src/common/mgmcommon/ConfigInfo.cpp b/ndb/src/common/mgmcommon/ConfigInfo.cpp index c2b5fdabf01..a1bd5f39d82 100644 --- a/ndb/src/common/mgmcommon/ConfigInfo.cpp +++ b/ndb/src/common/mgmcommon/ConfigInfo.cpp @@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule); /**************************************************************************** * Config Rules declarations ****************************************************************************/ -bool addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>§ions, - struct InitConfigFileParser::Context &ctx, - const char * ruleData); +bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>§ions, + struct InitConfigFileParser::Context &ctx, + const char * rule_data); +bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>§ions, + struct InitConfigFileParser::Context &ctx, + const char * rule_data); const ConfigInfo::ConfigRule ConfigInfo::m_ConfigRules[] = { - { addNodeConnections, 0 }, + { add_node_connections, 0 }, + { add_db_ports, 0 }, { 0, 0 } }; @@ -377,6 +381,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { (MAX_NODES - 1) }, { + CFG_DB_SERVER_PORT, + "ServerPort", + "DB", + "Port used to setup transporter", + ConfigInfo::USED, + false, + ConfigInfo::INT, + 2202, + 0, + 0x7FFFFFFF }, + + { CFG_DB_NO_REPLICAS, "NoOfReplicas", "DB", @@ -1231,7 +1247,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::STRING, - MANDATORY, + 0, 0, 0x7FFFFFFF }, @@ -1330,7 +1346,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::STRING, - MANDATORY, + 0, 0, 0x7FFFFFFF }, @@ -2510,10 +2526,14 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){ const char * compId; if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){ + require(ctx.m_currentSection->put("HostName", "")); + return true; +#if 0 ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section " "[%s] starting at line: %d", ctx.fname, ctx.m_sectionLineno); return false; +#endif } const Properties * computer; @@ -3158,9 +3178,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){ } bool -addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>§ions, +add_node_connections(Vector<ConfigInfo::ConfigRuleSection>§ions, struct InitConfigFileParser::Context &ctx, - const char * ruleData) + const char * rule_data) { Properties * props= ctx.m_config; Properties p_connections; @@ -3241,3 +3261,10 @@ addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>§ions, return true; } +bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>§ions, + struct InitConfigFileParser::Context &ctx, + const char * rule_data) +{ + return true; +} + diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index d2c622593de..c34d9bb01f9 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -114,7 +114,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) { struct ndb_mgm_configuration * p = 0; switch(m->type){ case MgmId_TCP: - p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, verId); + p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, + verId, nodeType); break; case MgmId_File: p = getConfig(m->data.file.filename, verId); @@ -155,7 +156,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) { ndb_mgm_configuration * ConfigRetriever::getConfig(const char * mgmhost, short port, - int versionId){ + int versionId, + int nodetype){ NdbMgmHandle h; h = ndb_mgm_create_handle(); @@ -175,6 +177,21 @@ ConfigRetriever::getConfig(const char * mgmhost, ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId); if(conf == 0){ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h)); + ndb_mgm_destroy_handle(&h); + return 0; + } + + { + unsigned nodeid= getOwnNodeId(); + + int res= ndb_mgm_alloc_nodeid(h, versionId, &nodeid, nodetype); + if(res != 0) { + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h)); + ndb_mgm_destroy_handle(&h); + return 0; + } + + _ownNodeId= nodeid; } ndb_mgm_disconnect(h); @@ -329,6 +346,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, } do { + if(strlen(hostname) == 0) + break; + if(strcasecmp(hostname, localhost) == 0) break; diff --git a/ndb/src/common/mgmcommon/LocalConfig.cpp b/ndb/src/common/mgmcommon/LocalConfig.cpp index 12e685ced34..67e92064e81 100644 --- a/ndb/src/common/mgmcommon/LocalConfig.cpp +++ b/ndb/src/common/mgmcommon/LocalConfig.cpp @@ -21,6 +21,7 @@ LocalConfig::LocalConfig(){ ids = 0; size = 0; items = 0; error_line = 0; error_msg[0] = 0; + _ownNodeId= 0; } bool @@ -95,6 +96,11 @@ LocalConfig::init(bool onlyNodeId, return false; } + //7. Check + if(readConnectString("host=localhost:2200", onlyNodeId)){ + return true; + } + setError(0, ""); return false; diff --git a/ndb/src/common/transporter/TCP_Transporter.cpp b/ndb/src/common/transporter/TCP_Transporter.cpp index 99b6a137797..8833b51e236 100644 --- a/ndb/src/common/transporter/TCP_Transporter.cpp +++ b/ndb/src/common/transporter/TCP_Transporter.cpp @@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void) #define ndbstrerror strerror #endif -TCP_Transporter::TCP_Transporter(int sendBufSize, int maxRecvSize, - int portNo, - const char *rHostName, +TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg, + int sendBufSize, int maxRecvSize, const char *lHostName, - NodeId rNodeId, NodeId lNodeId, + const char *rHostName, + int r_port, + NodeId lNodeId, + NodeId rNodeId, int byte_order, bool compr, bool chksm, bool signalId, Uint32 _reportFreq) : - Transporter(lNodeId, rNodeId, byte_order, compr, chksm, signalId), - m_sendBuffer(sendBufSize), - isServer(lNodeId < rNodeId), - port(portNo) + Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId, + byte_order, compr, chksm, signalId), + m_sendBuffer(sendBufSize) { maxReceiveSize = maxRecvSize; - strncpy(remoteHostName, rHostName, sizeof(remoteHostName)); - // Initialize member variables - Ndb_getInAddr(&remoteHostAddress, rHostName); - - Ndb_getInAddr(&localHostAddress, lHostName); theSocket = NDB_INVALID_SOCKET; sendCount = receiveCount = 0; @@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() { receiveBuffer.destroy(); } +bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) +{ + return connect_common(sockfd); +} + +bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) +{ + return connect_common(sockfd); +} + +bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) +{ + theSocket = sockfd; + setSocketOptions(); + setSocketNonBlocking(theSocket); + return true; +} + bool TCP_Transporter::initTransporter() { @@ -316,7 +330,7 @@ TCP_Transporter::doSend() { sendCount ++; sendSize += nBytesSent; if(sendCount == reportFreq){ - reportSendLen(callbackObj,remoteNodeId, sendCount, sendSize); + reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize); sendCount = 0; sendSize = 0; } @@ -331,7 +345,7 @@ TCP_Transporter::doSend() { #endif if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){ doDisconnect(); - reportDisconnect(callbackObj, remoteNodeId, InetErrno); + report_disconnect(InetErrno); } return false; @@ -361,14 +375,15 @@ TCP_Transporter::doReceive() { #endif ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)", receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer); - reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH); + report_error(TE_INVALID_MESSAGE_LENGTH); return 0; } receiveCount ++; receiveSize += nBytesRead; + if(receiveCount == reportFreq){ - reportReceiveLen(callbackObj, remoteNodeId, receiveCount, receiveSize); + reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize); receiveCount = 0; receiveSize = 0; } @@ -384,60 +399,17 @@ TCP_Transporter::doReceive() { if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){ // The remote node has closed down doDisconnect(); - reportDisconnect(callbackObj, remoteNodeId,InetErrno); + report_disconnect(InetErrno); } } return nBytesRead; } -bool -TCP_Transporter::connectImpl(Uint32 timeOutMillis){ - struct timeval timeout = {0, 0}; - timeout.tv_sec = timeOutMillis / 1000; - timeout.tv_usec = (timeOutMillis % 1000)*1000; - - bool retVal = false; - - if(isServer){ - if(theSocket == NDB_INVALID_SOCKET){ - startTCPServer(); - } - if(theSocket == NDB_INVALID_SOCKET) - { - NdbSleep_MilliSleep(timeOutMillis); - return false; - } - retVal = acceptClient(&timeout); - } else { - // Is client - retVal = connectClient(&timeout); - } - - if(!retVal) { - NdbSleep_MilliSleep(timeOutMillis); - return false; - } - -#if defined NDB_OSE || defined NDB_SOFTOSE - if(setsockopt(theSocket, SOL_SOCKET, SO_OSEOWNER, - &theReceiverPid, sizeof(PROCESS)) != 0){ - - ndbout << "Failed to transfer ownership of socket" << endl; - NDB_CLOSE_SOCKET(theSocket); - theSocket = -1; - return false; - } -#endif - - return true; -} - - void -TCP_Transporter::disconnectImpl() { +TCP_Transporter::disconnectImpl() { if(theSocket != NDB_INVALID_SOCKET){ if(NDB_CLOSE_SOCKET(theSocket) < 0){ - reportError(callbackObj, remoteNodeId, TE_ERROR_CLOSING_SOCKET); + report_error(TE_ERROR_CLOSING_SOCKET); } } @@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() { theSocket = NDB_INVALID_SOCKET; } - -bool -TCP_Transporter::startTCPServer() { - - int bindResult, listenResult; - - // The server variable is the remote server when we are a client - // htonl and htons returns the parameter in network byte order - // INADDR_ANY tells the OS kernel to choose the IP address - struct sockaddr_in server; - memset((void*)&server, 0, sizeof(server)); - server.sin_family = AF_INET; - server.sin_addr.s_addr = localHostAddress.s_addr; - server.sin_port = htons(port); - - if (theSocket != NDB_INVALID_SOCKET) { - return true; // Server socket is already initialized - } - - // Create the socket - theSocket = socket(AF_INET, SOCK_STREAM, 0); - if (theSocket == NDB_INVALID_SOCKET) { - reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET); - return false; - } - - // Set the socket reuse addr to true, so we are sure we can bind the - // socket - int reuseAddr = 1; - setsockopt(theSocket, SOL_SOCKET, SO_REUSEADDR, - (char*)&reuseAddr, sizeof(reuseAddr)); - - // Set the TCP_NODELAY option so also small packets are sent - // as soon as possible - int nodelay = 1; - setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY, - (char*)&nodelay, sizeof(nodelay)); - - // Bind the socket - bindResult = bind(theSocket, (struct sockaddr *) &server, - sizeof(server)); - if (bindResult < 0) { - reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET); - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; - } - - // Perform listen. - listenResult = listen(theSocket, 1); - if (listenResult == 1) { - reportThreadError(remoteNodeId, TE_LISTEN_FAILED); - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; - } - - return true; -} - - -bool -TCP_Transporter::acceptClient (struct timeval * timeout){ - - struct sockaddr_in clientAddress; - - fd_set readset; - FD_ZERO(&readset); - FD_SET(theSocket, &readset); - const int res = select(theSocket + 1, &readset, 0, 0, timeout); - if(res == 0) - return false; - - if(res < 0){ - reportThreadError(remoteNodeId, TE_ERROR_IN_SELECT_BEFORE_ACCEPT); - return false; - } - - NDB_SOCKLEN_T clientAddressLen = sizeof(clientAddress); - const NDB_SOCKET_TYPE clientSocket = accept(theSocket, - (struct sockaddr*)&clientAddress, - &clientAddressLen); - if (clientSocket == NDB_INVALID_SOCKET) { - reportThreadError(remoteNodeId, TE_ACCEPT_RETURN_ERROR); - return false; - } - - if (clientAddress.sin_addr.s_addr != remoteHostAddress.s_addr) { - ndbout_c("Wrong client connecting!"); - ndbout_c("connecting address: %s", inet_ntoa(clientAddress.sin_addr)); - ndbout_c("expecting address: %s", inet_ntoa(remoteHostAddress)); - // The newly connected host is not the remote host - // we wanted to connect to. Disconnect it. - // XXX This is not valid. We cannot disconnect it. - NDB_CLOSE_SOCKET(clientSocket); - return false; - } else { - NDB_CLOSE_SOCKET(theSocket); - theSocket = clientSocket; - setSocketOptions(); - setSocketNonBlocking(theSocket); - return true; - } -} - -bool -TCP_Transporter::connectClient (struct timeval * timeout){ - - // Create the socket - theSocket = socket(AF_INET, SOCK_STREAM, 0); - if (theSocket == NDB_INVALID_SOCKET) { - reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET); - return false; - } - - struct sockaddr_in server; - memset((void*)&server, 0, sizeof(server)); - server.sin_family = AF_INET; - server.sin_addr = remoteHostAddress; - server.sin_port = htons(port); - - struct sockaddr_in client; - memset((void*)&client, 0, sizeof(client)); - client.sin_family = AF_INET; - client.sin_addr = localHostAddress; - client.sin_port = 0; // Any port - - // Bind the socket - const int bindResult = bind(theSocket, (struct sockaddr *) &client, - sizeof(client)); - if (bindResult < 0) { - reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET); - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; - } - - const int connectRes = ::connect(theSocket, (struct sockaddr *) &server, - sizeof(server)); - if(connectRes == 0){ - setSocketOptions(); - setSocketNonBlocking(theSocket); - return true; - } - - NDB_CLOSE_SOCKET(theSocket); - theSocket = NDB_INVALID_SOCKET; - return false; -} - - - diff --git a/ndb/src/common/transporter/TCP_Transporter.hpp b/ndb/src/common/transporter/TCP_Transporter.hpp index 30b730a5b1c..958cfde03a1 100644 --- a/ndb/src/common/transporter/TCP_Transporter.hpp +++ b/ndb/src/common/transporter/TCP_Transporter.hpp @@ -14,24 +14,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -//**************************************************************************** -// -// AUTHOR -// Åsa Fransson -// -// NAME -// TCP_Transporter -// -// DESCRIPTION -// A TCP_Transporter instance is created when TCP/IP-communication -// shall be used (user specified). It handles connect, disconnect, -// send and receive. -// -// -// -//***************************************************************************/ -#ifndef TCP_Transporter_H -#define TCP_Transporter_H +#ifndef TCP_TRANSPORTER_HPP +#define TCP_TRANSPORTER_HPP #include "Transporter.hpp" #include "SendBuffer.hpp" @@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter { friend class TransporterRegistry; private: // Initialize member variables - TCP_Transporter(int sendBufferSize, int maxReceiveSize, - int port, - const char *rHostName, + TCP_Transporter(TransporterRegistry&, + int sendBufferSize, int maxReceiveSize, const char *lHostName, - NodeId rHostId, NodeId lHostId, + const char *rHostName, + int r_port, + NodeId lHostId, + NodeId rHostId, int byteorder, bool compression, bool checksum, bool signalId, Uint32 reportFreq = 4096); @@ -121,12 +107,14 @@ protected: * A client connects to the remote server * A server accepts any new connections */ - bool connectImpl(Uint32 timeOutMillis); + virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd); + virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd); + bool connect_common(NDB_SOCKET_TYPE sockfd); /** * Disconnects a TCP/IP node. Empty send and receivebuffer. */ - void disconnectImpl(); + virtual void disconnectImpl(); private: /** @@ -134,22 +122,12 @@ private: */ SendBuffer m_sendBuffer; - const bool isServer; - const unsigned int port; - // Sending/Receiving socket used by both client and server NDB_SOCKET_TYPE theSocket; Uint32 maxReceiveSize; /** - * Remote host name/and address - */ - char remoteHostName[256]; - struct in_addr remoteHostAddress; - struct in_addr localHostAddress; - - /** * Socket options */ int sockOptRcvBufSize; @@ -164,43 +142,6 @@ private: bool sendIsPossible(struct timeval * timeout); /** - * startTCPServer - None blocking - * - * create a server socket - * bind - * listen - * - * Note: Does not call accept - */ - bool startTCPServer(); - - /** - * acceptClient - Blocking - * - * Accept a connection - * checks if "right" client has connected - * if so - * close server socket - * else - * close newly created socket and goto begin - */ - bool acceptClient(struct timeval * timeout); - - /** - * Creates a client socket - * - * Note does not call connect - */ - bool createClientSocket(); - - /** - * connectClient - Blocking - * - * connects to remote host - */ - bool connectClient(struct timeval * timeout); - - /** * Statistics */ Uint32 reportFreq; diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp index 5ca523d5185..c6f93d2cbea 100644 --- a/ndb/src/common/transporter/Transporter.cpp +++ b/ndb/src/common/transporter/Transporter.cpp @@ -15,132 +15,125 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include <TransporterRegistry.hpp> +#include <TransporterCallback.hpp> #include "Transporter.hpp" #include "TransporterInternalDefinitions.hpp" #include <NdbSleep.h> - -Transporter::Transporter(NodeId lNodeId, NodeId rNodeId, +#include <SocketAuthenticator.hpp> +#include <InputStream.hpp> +#include <OutputStream.hpp> + +Transporter::Transporter(TransporterRegistry &t_reg, + const char *lHostName, + const char *rHostName, + int r_port, + NodeId lNodeId, + NodeId rNodeId, int _byteorder, bool _compression, bool _checksum, bool _signalId) - : localNodeId(lNodeId), remoteNodeId(rNodeId), - m_packer(_signalId, _checksum) + : m_r_port(r_port), localNodeId(lNodeId), remoteNodeId(rNodeId), + isServer(lNodeId < rNodeId), + m_packer(_signalId, _checksum), + m_transporter_registry(t_reg) { + if (rHostName && strlen(rHostName) > 0){ + strncpy(remoteHostName, rHostName, sizeof(remoteHostName)); + Ndb_getInAddr(&remoteHostAddress, rHostName); + } + else + { + if (!isServer) { + ndbout << "Unable to setup transporter. Node " << rNodeId + << " must have hostname. Update configuration." << endl; + exit(-1); + } + remoteHostName[0]= 0; + } + strncpy(localHostName, lHostName, sizeof(localHostName)); + + if (strlen(lHostName) > 0) + Ndb_getInAddr(&localHostAddress, lHostName); + byteOrder = _byteorder; compressionUsed = _compression; checksumUsed = _checksum; signalIdUsed = _signalId; - _threadError = TE_NO_ERROR; - - _connecting = false; - _disconnecting = false; - _connected = false; - _timeOutMillis = 1000; - theThreadPtr = NULL; - theMutexPtr = NdbMutex_Create(); -} - -Transporter::~Transporter(){ - NdbMutex_Destroy(theMutexPtr); + m_connected = false; + m_timeOutMillis = 1000; - if(theThreadPtr != 0){ - void * retVal; - NdbThread_WaitFor(theThreadPtr, &retVal); - NdbThread_Destroy(&theThreadPtr); + if (isServer) + m_socket_client= 0; + else + { + unsigned short tmp_port= 3307+rNodeId; + m_socket_client= new SocketClient(remoteHostName, tmp_port, + new SocketAuthSimple("ndbd passwd")); } } -extern "C" -void * -runConnect_C(void * me) -{ - runConnect(me); - NdbThread_Exit(0); - return NULL; -} - -void * -runConnect(void * me){ - Transporter * t = (Transporter *) me; - - DEBUG("Connect thread to " << t->remoteNodeId << " started"); - - while(true){ - NdbMutex_Lock(t->theMutexPtr); - if(t->_disconnecting){ - t->_connecting = false; - NdbMutex_Unlock(t->theMutexPtr); - DEBUG("Connect Thread " << t->remoteNodeId << " stop due to disconnect"); - return 0; - } - NdbMutex_Unlock(t->theMutexPtr); - - bool res = t->connectImpl(t->_timeOutMillis); // 1000 ms - DEBUG("Waiting for " << t->remoteNodeId << "..."); - if(res){ - t->_connected = true; - t->_connecting = false; - t->_errorCount = 0; - t->_threadError = TE_NO_ERROR; - DEBUG("Connect Thread " << t->remoteNodeId << " stop due to connect"); - return 0; - } - } +Transporter::~Transporter(){ + if (m_socket_client) + delete m_socket_client; } -void -Transporter::doConnect() { +bool +Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { + if(m_connected) + return true; // TODO assert(0); - NdbMutex_Lock(theMutexPtr); - if(_connecting || _disconnecting || _connected){ - NdbMutex_Unlock(theMutexPtr); - return; + bool res = connect_server_impl(sockfd); + if(res){ + m_connected = true; + m_errorCount = 0; } - - _connecting = true; - _threadError = TE_NO_ERROR; + return res; +} - // Start thread +bool +Transporter::connect_client() { + if(m_connected) + return true; + + NDB_SOCKET_TYPE sockfd = m_socket_client->connect(); - char buf[16]; - snprintf(buf, sizeof(buf), "ndb_con_%d", remoteNodeId); + if (sockfd < 0) + return false; + + // send info about own id + SocketOutputStream s_output(sockfd); + s_output.println("%d", localNodeId); + + // get remote id + int nodeId; + SocketInputStream s_input(sockfd); + char buf[256]; + if (s_input.gets(buf, 256) == 0) { + NDB_CLOSE_SOCKET(sockfd); + return false; + } + if (sscanf(buf, "%d", &nodeId) != 1) { + NDB_CLOSE_SOCKET(sockfd); + return false; + } - if(theThreadPtr != 0){ - void * retVal; - NdbThread_WaitFor(theThreadPtr, &retVal); - NdbThread_Destroy(&theThreadPtr); + bool res = connect_client_impl(sockfd); + if(res){ + m_connected = true; + m_errorCount = 0; } - - theThreadPtr = NdbThread_Create(runConnect_C, - (void**)this, - 32768, - buf, - NDB_THREAD_PRIO_LOW); - - NdbSleep_MilliSleep(100); // Let thread start - - NdbMutex_Unlock(theMutexPtr); + return res; } void -Transporter::doDisconnect() { - - NdbMutex_Lock(theMutexPtr); - _disconnecting = true; - while(_connecting){ - DEBUG("Waiting for connect to finish..."); - - NdbMutex_Unlock(theMutexPtr); - NdbSleep_MilliSleep(500); - NdbMutex_Lock(theMutexPtr); - } - - _connected = false; - +Transporter::doDisconnect() { + + if(!m_connected) + return; //assert(0); TODO will fail + disconnectImpl(); - _threadError = TE_NO_ERROR; - _disconnecting = false; - - NdbMutex_Unlock(theMutexPtr); + + m_connected= false; } diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp index 43b26d45899..9a39f8788bc 100644 --- a/ndb/src/common/transporter/Transporter.hpp +++ b/ndb/src/common/transporter/Transporter.hpp @@ -19,6 +19,9 @@ #include <ndb_global.h> +#include <SocketClient.hpp> + +#include <TransporterRegistry.hpp> #include <TransporterCallback.hpp> #include "TransporterDefinitions.hpp" #include "Packer.hpp" @@ -40,8 +43,9 @@ public: * None blocking * Use isConnected() to check status */ - virtual void doConnect(); - + bool connect_client(); + bool connect_server(NDB_SOCKET_TYPE socket); + /** * Blocking */ @@ -60,14 +64,17 @@ public: */ NodeId getRemoteNodeId() const; - /** - * Set callback object + * Local (own) Node Id */ - void setCallbackObject(void * callback); + NodeId getLocalNodeId() const; protected: - Transporter(NodeId lNodeId, + Transporter(TransporterRegistry &, + const char *lHostName, + const char *rHostName, + int r_port, + NodeId lNodeId, NodeId rNodeId, int byteorder, bool compression, @@ -78,58 +85,59 @@ protected: * Blocking, for max timeOut milli seconds * Returns true if connect succeded */ - virtual bool connectImpl(Uint32 timeOut) = 0; + virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0; + virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0; /** * Blocking */ virtual void disconnectImpl() = 0; - const NodeId localNodeId; + /** + * Remote host name/and address + */ + char remoteHostName[256]; + char localHostName[256]; + struct in_addr remoteHostAddress; + struct in_addr localHostAddress; + + const unsigned int m_r_port; + const NodeId remoteNodeId; + const NodeId localNodeId; + const bool isServer; + unsigned createIndex; int byteOrder; bool compressionUsed; bool checksumUsed; bool signalIdUsed; - Packer m_packer; - + Packer m_packer; private: - /** - * Thread and mutex for connect - */ - NdbThread* theThreadPtr; - friend void* runConnect(void * me); + + SocketClient *m_socket_client; protected: - /** - * Error reporting from connect thread(s) - */ - void reportThreadError(NodeId nodeId, - TransporterError errorCode); Uint32 getErrorCount(); - TransporterError getThreadError(); - void resetThreadError(); - TransporterError _threadError; - Uint32 _timeOutMillis; - Uint32 _errorCount; - -protected: - NdbMutex* theMutexPtr; - bool _connected; // Are we connected - bool _connecting; // Connect thread is running - bool _disconnecting; // We are disconnecting - - void * callbackObj; + Uint32 m_errorCount; + Uint32 m_timeOutMillis; + +protected: + bool m_connected; // Are we connected + + TransporterRegistry &m_transporter_registry; + void *get_callback_obj() { return m_transporter_registry.callbackObj; }; + void report_disconnect(int err){m_transporter_registry.report_disconnect(remoteNodeId,err);}; + void report_error(enum TransporterError err){reportError(get_callback_obj(),remoteNodeId,err);}; }; inline bool Transporter::isConnected() const { - return _connected; + return m_connected; } inline @@ -138,42 +146,17 @@ Transporter::getRemoteNodeId() const { return remoteNodeId; } -inline -void -Transporter::reportThreadError(NodeId nodeId, TransporterError errorCode) -{ -#if 0 - ndbout_c("Transporter::reportThreadError (NodeId: %d, Error code: %d)", - nodeId, errorCode); -#endif - _threadError = errorCode; - _errorCount++; -} - inline -TransporterError -Transporter::getThreadError(){ - return _threadError; +NodeId +Transporter::getLocalNodeId() const { + return remoteNodeId; } inline Uint32 Transporter::getErrorCount() { - return _errorCount; -} - -inline -void -Transporter::resetThreadError() -{ - _threadError = TE_NO_ERROR; -} - -inline -void -Transporter::setCallbackObject(void * callback) { - callbackObj = callback; + return m_errorCount; } #endif // Define of Transporter_H diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index 3f98eeed89e..bad3b44706f 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -16,10 +16,11 @@ #include <ndb_global.h> -#include "TransporterRegistry.hpp" +#include <TransporterRegistry.hpp> #include "TransporterInternalDefinitions.hpp" #include "Transporter.hpp" +#include <SocketAuthenticator.hpp> #ifdef NDB_TCP_TRANSPORTER #include "TCP_Transporter.hpp" @@ -42,20 +43,67 @@ #include "NdbOut.hpp" #include <NdbSleep.h> #include <NdbTick.h> -#define STEPPING 1 +#include <InputStream.hpp> +#include <OutputStream.hpp> + +SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) +{ + if (m_auth && !m_auth->server_authenticate(sockfd)){ + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + { + // read node id from client + int nodeId; + SocketInputStream s_input(sockfd); + char buf[256]; + if (s_input.gets(buf, 256) == 0) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + if (sscanf(buf, "%d", &nodeId) != 1) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + //check that nodeid is valid and that there is an allocated transporter + if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + if (m_transporter_registry->theTransporters[nodeId] == 0) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + //check that the transporter should be connected + if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) { + NDB_CLOSE_SOCKET(sockfd); + return 0; + } + + Transporter *t= m_transporter_registry->theTransporters[nodeId]; + + // send info about own id (just as response to acnowledge connection) + SocketOutputStream s_output(sockfd); + s_output.println("%d", t->getLocalNodeId()); + + // setup transporter (transporter responsable for closing sockfd) + t->connect_server(sockfd); + } + + return 0; +} TransporterRegistry::TransporterRegistry(void * callback, unsigned _maxTransporters, unsigned sizeOfLongSignalMemory) { + m_transporter_service= 0; nodeIdSpecified = false; maxTransporters = _maxTransporters; sendCounter = 1; - m_ccCount = 0; - m_ccIndex = 0; - m_ccStep = STEPPING; - m_ccReady = false; - m_nTransportersPerformConnect=0; callbackObj=callback; @@ -82,7 +130,7 @@ TransporterRegistry::TransporterRegistry(void * callback, theSHMTransporters[i] = NULL; theOSETransporters[i] = NULL; theTransporters[i] = NULL; - performStates[i] = PerformNothing; + performStates[i] = DISCONNECTED; ioStates[i] = NoHalt; } theOSEReceiver = 0; @@ -154,13 +202,14 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { return false; - TCP_Transporter * t = new TCP_Transporter(config->sendBufferSize, - config->maxReceiveSize, - config->port, - config->remoteHostName, + TCP_Transporter * t = new TCP_Transporter(*this, + config->sendBufferSize, + config->maxReceiveSize, config->localHostName, - config->remoteNodeId, + config->remoteHostName, + config->port, localNodeId, + config->remoteNodeId, config->byteOrder, config->compression, config->checksum, @@ -172,13 +221,11 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { return false; } - t->setCallbackObject(callbackObj); - // Put the transporter in the transporter arrays theTCPTransporters[nTCPTransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nTCPTransporters++; @@ -228,12 +275,11 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { delete t; return false; } - t->setCallbackObject(callbackObj); // Put the transporter in the transporter arrays theOSETransporters[nOSETransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nOSETransporters++; @@ -279,12 +325,11 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { delete t; return false; } - t->setCallbackObject(callbackObj); // Put the transporter in the transporter arrays theSCITransporters[nSCITransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nSCITransporters++; @@ -321,12 +366,11 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { delete t; return false; } - t->setCallbackObject(callbackObj); // Put the transporter in the transporter arrays theSHMTransporters[nSHMTransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER; - performStates[t->getRemoteNodeId()] = PerformNothing; + performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nSHMTransporters++; @@ -781,7 +825,7 @@ TransporterRegistry::performReceive(){ TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { const int receiveSize = t->doReceive(); if(receiveSize > 0){ @@ -804,7 +848,7 @@ TransporterRegistry::performReceive(){ checkJobBuffer(); SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); @@ -819,7 +863,7 @@ TransporterRegistry::performReceive(){ checkJobBuffer(); SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); @@ -840,7 +884,7 @@ TransporterRegistry::performSend(){ #ifdef NDB_OSE_TRANSPORTER for (int i = 0; i < nOSETransporters; i++){ OSE_Transporter *t = theOSETransporters[i]; - if((performStates[t->getRemoteNodeId()] == PerformIO) && + if((is_connected(t->getRemoteNodeId()) && (t->isConnected())) { t->doSend(); }//if @@ -887,7 +931,7 @@ TransporterRegistry::performSend(){ TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const int socket = t->getSocket(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &writeset)) { t->doSend(); }//if @@ -901,7 +945,7 @@ TransporterRegistry::performSend(){ if (t && (t->hasDataToSend()) && (t->isConnected()) && - (performStates[t->getRemoteNodeId()] == PerformIO)) { + (is_connected(t->getRemoteNodeId()))) { t->doSend(); }//if }//for @@ -910,7 +954,7 @@ TransporterRegistry::performSend(){ if (t && (t->hasDataToSend()) && (t->isConnected()) && - (performStates[t->getRemoteNodeId()] == PerformIO)) { + (is_connected(t->getRemoteNodeId()))) { t->doSend(); }//if }//for @@ -925,7 +969,7 @@ TransporterRegistry::performSend(){ SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); - if(performStates[nodeId] == PerformIO){ + if(is_connected(nodeId)){ if(t->isConnected() && t->hasDataToSend()) { t->doSend(); } //if @@ -961,70 +1005,210 @@ TransporterRegistry::printState(){ } #endif -PerformState -TransporterRegistry::performState(NodeId nodeId) { - return performStates[nodeId]; +IOState +TransporterRegistry::ioState(NodeId nodeId) { + return ioStates[nodeId]; } -#ifdef DEBUG_TRANSPORTER -const char * -performStateString(PerformState state){ - switch(state){ - case PerformNothing: - return "PerformNothing"; - break; - case PerformIO: - return "PerformIO"; +void +TransporterRegistry::setIOState(NodeId nodeId, IOState state) { + DEBUG("TransporterRegistry::setIOState(" + << nodeId << ", " << state << ")"); + ioStates[nodeId] = state; +} + +static void * +run_start_clients_C(void * me) +{ + ((TransporterRegistry*) me)->start_clients_thread(); + NdbThread_Exit(0); + return me; +} + +// Run by kernel thread +void +TransporterRegistry::do_connect(NodeId node_id) +{ + PerformState &curr_state = performStates[node_id]; + switch(curr_state){ + case DISCONNECTED: break; - case PerformConnect: - return "PerformConnect"; + case CONNECTED: + return; + case CONNECTING: + return; + case DISCONNECTING: break; - case PerformDisconnect: - return "PerformDisconnect"; + } + curr_state= CONNECTING; +} +void +TransporterRegistry::do_disconnect(NodeId node_id) +{ + PerformState &curr_state = performStates[node_id]; + switch(curr_state){ + case DISCONNECTED: + return; + case CONNECTED: break; - case RemoveTransporter: - return "RemoveTransporter"; + case CONNECTING: break; + case DISCONNECTING: + return; } - return "Unknown"; + curr_state= DISCONNECTING; } -#endif void -TransporterRegistry::setPerformState(NodeId nodeId, PerformState state) { - DEBUG("TransporterRegistry::setPerformState(" - << nodeId << ", " << performStateString(state) << ")"); - - performStates[nodeId] = state; +TransporterRegistry::report_connect(NodeId node_id) +{ + performStates[node_id] = CONNECTED; + reportConnect(callbackObj, node_id); +} + +void +TransporterRegistry::report_disconnect(NodeId node_id, int errnum) +{ + performStates[node_id] = DISCONNECTED; + reportDisconnect(callbackObj, node_id, errnum); } void -TransporterRegistry::setPerformState(PerformState state) { - int count = 0; - int index = 0; - while(count < nTransporters){ - if(theTransporters[index] != 0){ - setPerformState(theTransporters[index]->getRemoteNodeId(), state); - count ++; +TransporterRegistry::update_connections() +{ + for (int i= 0, n= 0; n < nTransporters; i++){ + Transporter * t = theTransporters[i]; + if (!t) + continue; + n++; + + const NodeId nodeId = t->getRemoteNodeId(); + switch(performStates[nodeId]){ + case CONNECTED: + case DISCONNECTED: + break; + case CONNECTING: + if(t->isConnected()) + report_connect(nodeId); + break; + case DISCONNECTING: + if(!t->isConnected()) + report_disconnect(nodeId, 0); + break; } - index ++; } } -IOState -TransporterRegistry::ioState(NodeId nodeId) { - return ioStates[nodeId]; +// run as own thread +void +TransporterRegistry::start_clients_thread() +{ + while (m_run_start_clients_thread) { + NdbSleep_MilliSleep(100); + for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){ + Transporter * t = theTransporters[i]; + if (!t) + continue; + n++; + + const NodeId nodeId = t->getRemoteNodeId(); + switch(performStates[nodeId]){ + case CONNECTING: + if(!t->isConnected() && !t->isServer) + t->connect_client(); + break; + case DISCONNECTING: + if(t->isConnected()) + t->doDisconnect(); + break; + default: + break; + } + } + } } -void -TransporterRegistry::setIOState(NodeId nodeId, IOState state) { - DEBUG("TransporterRegistry::setIOState(" - << nodeId << ", " << state << ")"); - ioStates[nodeId] = state; +bool +TransporterRegistry::start_clients() +{ + m_run_start_clients_thread= true; + m_start_clients_thread= NdbThread_Create(run_start_clients_C, + (void**)this, + 32768, + "ndb_start_clients", + NDB_THREAD_PRIO_LOW); + if (m_start_clients_thread == 0) { + m_run_start_clients_thread= false; + return false; + } + return true; +} + +bool +TransporterRegistry::stop_clients() +{ + if (m_start_clients_thread) { + m_run_start_clients_thread= false; + void* status; + int r= NdbThread_WaitFor(m_start_clients_thread, &status); + NdbThread_Destroy(&m_start_clients_thread); + } + return true; +} + +bool +TransporterRegistry::start_service(SocketServer& socket_server) +{ +#if 0 + for (int i= 0, n= 0; n < nTransporters; i++){ + Transporter * t = theTransporters[i]; + if (!t) + continue; + n++; + if (t->isServer) { + t->m_service = new TransporterService(new SocketAuthSimple("ndbd passwd")); + if(!socket_server.setup(t->m_service, t->m_r_port, 0)) + { + ndbout_c("Unable to setup transporter service port: %d!\n" + "Please check if the port is already used,\n" + "(perhaps a mgmtsrvrserver is already running)", + m_service_port); + delete t->m_service; + return false; + } + } + } +#endif + + m_transporter_service = new TransporterService(new SocketAuthSimple("ndbd passwd")); + + if (nodeIdSpecified != true) { + ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified"); + return false; + } + + m_service_port = 3307 + localNodeId; + //m_interface_name = "ndbd"; + m_interface_name = 0; + + if(!socket_server.setup(m_transporter_service, m_service_port, m_interface_name)) + { + ndbout_c("Unable to setup transporter service port: %d!\n" + "Please check if the port is already used,\n" + "(perhaps a mgmtsrvrserver is already running)", + m_service_port); + delete m_transporter_service; + return false; + } + + m_transporter_service->setTransporterRegistry(this); + + return true; } void -TransporterRegistry::startReceiving(){ +TransporterRegistry::startReceiving() +{ #ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != NULL){ theOSEReceiver->createPhantom(); @@ -1081,99 +1265,6 @@ TransporterRegistry::stopSending(){ #endif } -/** - * The old implementation did not scale with a large - * number of nodes. (Watchdog killed NDB because - * it took too long time to allocated threads in - * doConnect. - * - * The new implementation only checks the connection - * for a number of transporters (STEPPING), until to - * the point where all transporters has executed - * doConnect once. After that, the behaviour is as - * in the old implemenation, i.e, checking the connection - * for all transporters. - * @todo: instead of STEPPING, maybe we should only - * allow checkConnections to execute for a certain - * time that somehow factors in heartbeat times and - * watchdog times. - * - */ - -void -TransporterRegistry::checkConnections(){ - if(m_ccStep > nTransporters) - m_ccStep = nTransporters; - - while(m_ccCount < m_ccStep){ - if(theTransporters[m_ccIndex] != 0){ - Transporter * t = theTransporters[m_ccIndex]; - const NodeId nodeId = t->getRemoteNodeId(); - if(t->getThreadError() != 0) { - reportError(callbackObj, nodeId, t->getThreadError()); - t->resetThreadError(); - } - - switch(performStates[nodeId]){ - case PerformConnect: - if(!t->isConnected()){ - t->doConnect(); - if(m_nTransportersPerformConnect!=nTransporters) - m_nTransportersPerformConnect++; - - } else { - performStates[nodeId] = PerformIO; - reportConnect(callbackObj, nodeId); - } - break; - case PerformDisconnect: - { - bool wasConnected = t->isConnected(); - t->doDisconnect(); - performStates[nodeId] = PerformNothing; - if(wasConnected){ - reportDisconnect(callbackObj, nodeId,0); - } - } - break; - case RemoveTransporter: - removeTransporter(nodeId); - break; - case PerformNothing: - case PerformIO: - break; - } - m_ccCount ++; - } - m_ccIndex ++; - } - - if(!m_ccReady) { - if(m_ccCount < nTransporters) { - if(nTransporters - m_ccStep < STEPPING) - m_ccStep += nTransporters-m_ccStep; - else - m_ccStep += STEPPING; - - // ndbout_c("count %d step %d ", m_ccCount, m_ccStep); - } - else { - m_ccCount = 0; - m_ccIndex = 0; - m_ccStep = STEPPING; - // ndbout_c("count %d step %d ", m_ccCount, m_ccStep); - } - } - if((nTransporters == m_nTransportersPerformConnect) || m_ccReady) { - m_ccReady = true; - m_ccCount = 0; - m_ccIndex = 0; - m_ccStep = nTransporters; - // ndbout_c("alla count %d step %d ", m_ccCount, m_ccStep); - } - -}//TransporterRegistry::checkConnections() - NdbOut & operator <<(NdbOut & out, SignalHeader & sh){ out << "-- Signal Header --" << endl; out << "theLength: " << sh.theLength << endl; diff --git a/ndb/src/common/util/Makefile.am b/ndb/src/common/util/Makefile.am index 59d9775b8e3..678added01e 100644 --- a/ndb/src/common/util/Makefile.am +++ b/ndb/src/common/util/Makefile.am @@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la libgeneral_la_SOURCES = \ File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \ - SimpleProperties.cpp Parser.cpp InputStream.cpp SocketServer.cpp \ + SimpleProperties.cpp Parser.cpp InputStream.cpp \ + SocketServer.cpp SocketClient.cpp SocketAuthenticator.cpp\ OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \ NdbSqlUtil.cpp new.cpp \ uucode.c random.c getarg.c version.c \ diff --git a/ndb/src/common/util/SocketAuthenticator.cpp b/ndb/src/common/util/SocketAuthenticator.cpp new file mode 100644 index 00000000000..d0abf89b2b1 --- /dev/null +++ b/ndb/src/common/util/SocketAuthenticator.cpp @@ -0,0 +1,63 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include <ndb_global.h> + +#include <SocketClient.hpp> +#include <SocketAuthenticator.hpp> +#include <NdbOut.hpp> + +SocketAuthSimple::SocketAuthSimple(const char *passwd) { + m_passwd= strdup(passwd); + m_buf= (char*)malloc(strlen(passwd)+1); +} + +SocketAuthSimple::~SocketAuthSimple() +{ + if (m_passwd) + free((void*)m_passwd); + if (m_buf) + free(m_buf); +} + +bool SocketAuthSimple::client_authenticate(int sockfd) +{ + if (!m_passwd) + return false; + + int len = strlen(m_passwd); + int r; + r= send(sockfd, m_passwd, len, 0); + + r= recv(sockfd, m_buf, len, 0); + m_buf[r]= '\0'; + + return true; +} + +bool SocketAuthSimple::server_authenticate(int sockfd) +{ + if (!m_passwd) + return false; + + int len = strlen(m_passwd), r; + r= recv(sockfd, m_buf, len, 0); + m_buf[r]= '\0'; + r= send(sockfd, m_passwd, len, 0); + + return true; +} diff --git a/ndb/src/common/util/SocketClient.cpp b/ndb/src/common/util/SocketClient.cpp new file mode 100644 index 00000000000..b7769633875 --- /dev/null +++ b/ndb/src/common/util/SocketClient.cpp @@ -0,0 +1,90 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include <ndb_global.h> +#include <NdbOut.hpp> + +#include <SocketClient.hpp> +#include <SocketAuthenticator.hpp> + +SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa) +{ + m_auth= sa; + m_port= port; + m_server_name= strdup(server_name); + m_sockfd= -1; +} + +SocketClient::~SocketClient() +{ + if (m_server_name) + free(m_server_name); + if (m_sockfd >= 0) + NDB_CLOSE_SOCKET(m_sockfd); + if (m_auth) + delete m_auth; +} + +bool +SocketClient::init() +{ + if (m_sockfd >= 0) + NDB_CLOSE_SOCKET(m_sockfd); + + memset(&m_servaddr, 0, sizeof(m_servaddr)); + m_servaddr.sin_family = AF_INET; + m_servaddr.sin_port = htons(m_port); + // Convert ip address presentation format to numeric format + if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name)) + return false; + + m_sockfd= socket(AF_INET, SOCK_STREAM, 0); + if (m_sockfd == NDB_INVALID_SOCKET) { + return false; + } + + return true; +} + +NDB_SOCKET_TYPE +SocketClient::connect() +{ + if (m_sockfd < 0) + { + if (!init()) { + ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl; + return -1; + } + } + + const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr)); + if (r == -1) + return -1; + + if (m_auth) + if (!m_auth->client_authenticate(m_sockfd)) + { + NDB_CLOSE_SOCKET(m_sockfd); + m_sockfd= -1; + return -1; + } + + NDB_SOCKET_TYPE sockfd= m_sockfd; + m_sockfd= -1; + + return sockfd; +} diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp index a0ec0aaa676..67cbf8aba4a 100644 --- a/ndb/src/common/util/SocketServer.cpp +++ b/ndb/src/common/util/SocketServer.cpp @@ -17,7 +17,7 @@ #include <ndb_global.h> -#include "SocketServer.hpp" +#include <SocketServer.hpp> #include <NdbTCP.h> #include <NdbOut.hpp> |