summaryrefslogtreecommitdiff
path: root/ndb/src/common
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.bredbandsbolaget.se>2004-06-23 00:59:08 +0000
committerunknown <tomas@poseidon.bredbandsbolaget.se>2004-06-23 00:59:08 +0000
commitf44c3ee911dcdd3d56b51d286d7497a2a71f342b (patch)
treecc02037b22a13e1ed0101c3b0af371530863efdc /ndb/src/common
parent190d9e72cbd7fa7b6b54c0fbc07e287b71967bb8 (diff)
parentdc2544fdee9e510614cc7d674726ea388284e57f (diff)
downloadmariadb-git-f44c3ee911dcdd3d56b51d286d7497a2a71f342b.tar.gz
Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into poseidon.bredbandsbolaget.se:/home/tomas/mysql-4.1-ndb ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp: Auto merged ndb/src/kernel/main.cpp: Auto merged ndb/src/mgmsrv/MgmtSrvr.cpp: Auto merged ndb/src/ndbapi/ClusterMgr.cpp: Auto merged ndb/src/ndbapi/TransporterFacade.cpp: Auto merged ndb/src/ndbapi/TransporterFacade.hpp: Auto merged
Diffstat (limited to 'ndb/src/common')
-rw-r--r--ndb/src/common/mgmcommon/ConfigInfo.cpp43
-rw-r--r--ndb/src/common/mgmcommon/ConfigRetriever.cpp24
-rw-r--r--ndb/src/common/mgmcommon/LocalConfig.cpp6
-rw-r--r--ndb/src/common/transporter/TCP_Transporter.cpp250
-rw-r--r--ndb/src/common/transporter/TCP_Transporter.hpp83
-rw-r--r--ndb/src/common/transporter/Transporter.cpp193
-rw-r--r--ndb/src/common/transporter/Transporter.hpp109
-rw-r--r--ndb/src/common/transporter/TransporterRegistry.cpp421
-rw-r--r--ndb/src/common/util/Makefile.am3
-rw-r--r--ndb/src/common/util/SocketAuthenticator.cpp63
-rw-r--r--ndb/src/common/util/SocketClient.cpp90
-rw-r--r--ndb/src/common/util/SocketServer.cpp2
12 files changed, 661 insertions, 626 deletions
diff --git a/ndb/src/common/mgmcommon/ConfigInfo.cpp b/ndb/src/common/mgmcommon/ConfigInfo.cpp
index c2b5fdabf01..a1bd5f39d82 100644
--- a/ndb/src/common/mgmcommon/ConfigInfo.cpp
+++ b/ndb/src/common/mgmcommon/ConfigInfo.cpp
@@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule);
/****************************************************************************
* Config Rules declarations
****************************************************************************/
-bool addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
- struct InitConfigFileParser::Context &ctx,
- const char * ruleData);
+bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
+ struct InitConfigFileParser::Context &ctx,
+ const char * rule_data);
+bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
+ struct InitConfigFileParser::Context &ctx,
+ const char * rule_data);
const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = {
- { addNodeConnections, 0 },
+ { add_node_connections, 0 },
+ { add_db_ports, 0 },
{ 0, 0 }
};
@@ -377,6 +381,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
(MAX_NODES - 1) },
{
+ CFG_DB_SERVER_PORT,
+ "ServerPort",
+ "DB",
+ "Port used to setup transporter",
+ ConfigInfo::USED,
+ false,
+ ConfigInfo::INT,
+ 2202,
+ 0,
+ 0x7FFFFFFF },
+
+ {
CFG_DB_NO_REPLICAS,
"NoOfReplicas",
"DB",
@@ -1231,7 +1247,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::STRING,
- MANDATORY,
+ 0,
0,
0x7FFFFFFF },
@@ -1330,7 +1346,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::STRING,
- MANDATORY,
+ 0,
0,
0x7FFFFFFF },
@@ -2510,10 +2526,14 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){
const char * compId;
if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){
+ require(ctx.m_currentSection->put("HostName", ""));
+ return true;
+#if 0
ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
+#endif
}
const Properties * computer;
@@ -3158,9 +3178,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){
}
bool
-addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
+add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
- const char * ruleData)
+ const char * rule_data)
{
Properties * props= ctx.m_config;
Properties p_connections;
@@ -3241,3 +3261,10 @@ addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
return true;
}
+bool add_db_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
+ struct InitConfigFileParser::Context &ctx,
+ const char * rule_data)
+{
+ return true;
+}
+
diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
index d2c622593de..c34d9bb01f9 100644
--- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp
+++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
@@ -114,7 +114,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
struct ndb_mgm_configuration * p = 0;
switch(m->type){
case MgmId_TCP:
- p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, verId);
+ p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port,
+ verId, nodeType);
break;
case MgmId_File:
p = getConfig(m->data.file.filename, verId);
@@ -155,7 +156,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
ndb_mgm_configuration *
ConfigRetriever::getConfig(const char * mgmhost,
short port,
- int versionId){
+ int versionId,
+ int nodetype){
NdbMgmHandle h;
h = ndb_mgm_create_handle();
@@ -175,6 +177,21 @@ ConfigRetriever::getConfig(const char * mgmhost,
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId);
if(conf == 0){
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
+ ndb_mgm_destroy_handle(&h);
+ return 0;
+ }
+
+ {
+ unsigned nodeid= getOwnNodeId();
+
+ int res= ndb_mgm_alloc_nodeid(h, versionId, &nodeid, nodetype);
+ if(res != 0) {
+ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
+ ndb_mgm_destroy_handle(&h);
+ return 0;
+ }
+
+ _ownNodeId= nodeid;
}
ndb_mgm_disconnect(h);
@@ -329,6 +346,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf,
}
do {
+ if(strlen(hostname) == 0)
+ break;
+
if(strcasecmp(hostname, localhost) == 0)
break;
diff --git a/ndb/src/common/mgmcommon/LocalConfig.cpp b/ndb/src/common/mgmcommon/LocalConfig.cpp
index 12e685ced34..67e92064e81 100644
--- a/ndb/src/common/mgmcommon/LocalConfig.cpp
+++ b/ndb/src/common/mgmcommon/LocalConfig.cpp
@@ -21,6 +21,7 @@
LocalConfig::LocalConfig(){
ids = 0; size = 0; items = 0;
error_line = 0; error_msg[0] = 0;
+ _ownNodeId= 0;
}
bool
@@ -95,6 +96,11 @@ LocalConfig::init(bool onlyNodeId,
return false;
}
+ //7. Check
+ if(readConnectString("host=localhost:2200", onlyNodeId)){
+ return true;
+ }
+
setError(0, "");
return false;
diff --git a/ndb/src/common/transporter/TCP_Transporter.cpp b/ndb/src/common/transporter/TCP_Transporter.cpp
index 99b6a137797..8833b51e236 100644
--- a/ndb/src/common/transporter/TCP_Transporter.cpp
+++ b/ndb/src/common/transporter/TCP_Transporter.cpp
@@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void)
#define ndbstrerror strerror
#endif
-TCP_Transporter::TCP_Transporter(int sendBufSize, int maxRecvSize,
- int portNo,
- const char *rHostName,
+TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
+ int sendBufSize, int maxRecvSize,
const char *lHostName,
- NodeId rNodeId, NodeId lNodeId,
+ const char *rHostName,
+ int r_port,
+ NodeId lNodeId,
+ NodeId rNodeId,
int byte_order,
bool compr, bool chksm, bool signalId,
Uint32 _reportFreq) :
- Transporter(lNodeId, rNodeId, byte_order, compr, chksm, signalId),
- m_sendBuffer(sendBufSize),
- isServer(lNodeId < rNodeId),
- port(portNo)
+ Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
+ byte_order, compr, chksm, signalId),
+ m_sendBuffer(sendBufSize)
{
maxReceiveSize = maxRecvSize;
- strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
-
// Initialize member variables
- Ndb_getInAddr(&remoteHostAddress, rHostName);
-
- Ndb_getInAddr(&localHostAddress, lHostName);
theSocket = NDB_INVALID_SOCKET;
sendCount = receiveCount = 0;
@@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() {
receiveBuffer.destroy();
}
+bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
+{
+ return connect_common(sockfd);
+}
+
+bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
+{
+ return connect_common(sockfd);
+}
+
+bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
+{
+ theSocket = sockfd;
+ setSocketOptions();
+ setSocketNonBlocking(theSocket);
+ return true;
+}
+
bool
TCP_Transporter::initTransporter() {
@@ -316,7 +330,7 @@ TCP_Transporter::doSend() {
sendCount ++;
sendSize += nBytesSent;
if(sendCount == reportFreq){
- reportSendLen(callbackObj,remoteNodeId, sendCount, sendSize);
+ reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
sendCount = 0;
sendSize = 0;
}
@@ -331,7 +345,7 @@ TCP_Transporter::doSend() {
#endif
if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
doDisconnect();
- reportDisconnect(callbackObj, remoteNodeId, InetErrno);
+ report_disconnect(InetErrno);
}
return false;
@@ -361,14 +375,15 @@ TCP_Transporter::doReceive() {
#endif
ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
- reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
+ report_error(TE_INVALID_MESSAGE_LENGTH);
return 0;
}
receiveCount ++;
receiveSize += nBytesRead;
+
if(receiveCount == reportFreq){
- reportReceiveLen(callbackObj, remoteNodeId, receiveCount, receiveSize);
+ reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
receiveCount = 0;
receiveSize = 0;
}
@@ -384,60 +399,17 @@ TCP_Transporter::doReceive() {
if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
// The remote node has closed down
doDisconnect();
- reportDisconnect(callbackObj, remoteNodeId,InetErrno);
+ report_disconnect(InetErrno);
}
}
return nBytesRead;
}
-bool
-TCP_Transporter::connectImpl(Uint32 timeOutMillis){
- struct timeval timeout = {0, 0};
- timeout.tv_sec = timeOutMillis / 1000;
- timeout.tv_usec = (timeOutMillis % 1000)*1000;
-
- bool retVal = false;
-
- if(isServer){
- if(theSocket == NDB_INVALID_SOCKET){
- startTCPServer();
- }
- if(theSocket == NDB_INVALID_SOCKET)
- {
- NdbSleep_MilliSleep(timeOutMillis);
- return false;
- }
- retVal = acceptClient(&timeout);
- } else {
- // Is client
- retVal = connectClient(&timeout);
- }
-
- if(!retVal) {
- NdbSleep_MilliSleep(timeOutMillis);
- return false;
- }
-
-#if defined NDB_OSE || defined NDB_SOFTOSE
- if(setsockopt(theSocket, SOL_SOCKET, SO_OSEOWNER,
- &theReceiverPid, sizeof(PROCESS)) != 0){
-
- ndbout << "Failed to transfer ownership of socket" << endl;
- NDB_CLOSE_SOCKET(theSocket);
- theSocket = -1;
- return false;
- }
-#endif
-
- return true;
-}
-
-
void
-TCP_Transporter::disconnectImpl() {
+TCP_Transporter::disconnectImpl() {
if(theSocket != NDB_INVALID_SOCKET){
if(NDB_CLOSE_SOCKET(theSocket) < 0){
- reportError(callbackObj, remoteNodeId, TE_ERROR_CLOSING_SOCKET);
+ report_error(TE_ERROR_CLOSING_SOCKET);
}
}
@@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() {
theSocket = NDB_INVALID_SOCKET;
}
-
-bool
-TCP_Transporter::startTCPServer() {
-
- int bindResult, listenResult;
-
- // The server variable is the remote server when we are a client
- // htonl and htons returns the parameter in network byte order
- // INADDR_ANY tells the OS kernel to choose the IP address
- struct sockaddr_in server;
- memset((void*)&server, 0, sizeof(server));
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = localHostAddress.s_addr;
- server.sin_port = htons(port);
-
- if (theSocket != NDB_INVALID_SOCKET) {
- return true; // Server socket is already initialized
- }
-
- // Create the socket
- theSocket = socket(AF_INET, SOCK_STREAM, 0);
- if (theSocket == NDB_INVALID_SOCKET) {
- reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
- return false;
- }
-
- // Set the socket reuse addr to true, so we are sure we can bind the
- // socket
- int reuseAddr = 1;
- setsockopt(theSocket, SOL_SOCKET, SO_REUSEADDR,
- (char*)&reuseAddr, sizeof(reuseAddr));
-
- // Set the TCP_NODELAY option so also small packets are sent
- // as soon as possible
- int nodelay = 1;
- setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
- (char*)&nodelay, sizeof(nodelay));
-
- // Bind the socket
- bindResult = bind(theSocket, (struct sockaddr *) &server,
- sizeof(server));
- if (bindResult < 0) {
- reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
- NDB_CLOSE_SOCKET(theSocket);
- theSocket = NDB_INVALID_SOCKET;
- return false;
- }
-
- // Perform listen.
- listenResult = listen(theSocket, 1);
- if (listenResult == 1) {
- reportThreadError(remoteNodeId, TE_LISTEN_FAILED);
- NDB_CLOSE_SOCKET(theSocket);
- theSocket = NDB_INVALID_SOCKET;
- return false;
- }
-
- return true;
-}
-
-
-bool
-TCP_Transporter::acceptClient (struct timeval * timeout){
-
- struct sockaddr_in clientAddress;
-
- fd_set readset;
- FD_ZERO(&readset);
- FD_SET(theSocket, &readset);
- const int res = select(theSocket + 1, &readset, 0, 0, timeout);
- if(res == 0)
- return false;
-
- if(res < 0){
- reportThreadError(remoteNodeId, TE_ERROR_IN_SELECT_BEFORE_ACCEPT);
- return false;
- }
-
- NDB_SOCKLEN_T clientAddressLen = sizeof(clientAddress);
- const NDB_SOCKET_TYPE clientSocket = accept(theSocket,
- (struct sockaddr*)&clientAddress,
- &clientAddressLen);
- if (clientSocket == NDB_INVALID_SOCKET) {
- reportThreadError(remoteNodeId, TE_ACCEPT_RETURN_ERROR);
- return false;
- }
-
- if (clientAddress.sin_addr.s_addr != remoteHostAddress.s_addr) {
- ndbout_c("Wrong client connecting!");
- ndbout_c("connecting address: %s", inet_ntoa(clientAddress.sin_addr));
- ndbout_c("expecting address: %s", inet_ntoa(remoteHostAddress));
- // The newly connected host is not the remote host
- // we wanted to connect to. Disconnect it.
- // XXX This is not valid. We cannot disconnect it.
- NDB_CLOSE_SOCKET(clientSocket);
- return false;
- } else {
- NDB_CLOSE_SOCKET(theSocket);
- theSocket = clientSocket;
- setSocketOptions();
- setSocketNonBlocking(theSocket);
- return true;
- }
-}
-
-bool
-TCP_Transporter::connectClient (struct timeval * timeout){
-
- // Create the socket
- theSocket = socket(AF_INET, SOCK_STREAM, 0);
- if (theSocket == NDB_INVALID_SOCKET) {
- reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
- return false;
- }
-
- struct sockaddr_in server;
- memset((void*)&server, 0, sizeof(server));
- server.sin_family = AF_INET;
- server.sin_addr = remoteHostAddress;
- server.sin_port = htons(port);
-
- struct sockaddr_in client;
- memset((void*)&client, 0, sizeof(client));
- client.sin_family = AF_INET;
- client.sin_addr = localHostAddress;
- client.sin_port = 0; // Any port
-
- // Bind the socket
- const int bindResult = bind(theSocket, (struct sockaddr *) &client,
- sizeof(client));
- if (bindResult < 0) {
- reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
- NDB_CLOSE_SOCKET(theSocket);
- theSocket = NDB_INVALID_SOCKET;
- return false;
- }
-
- const int connectRes = ::connect(theSocket, (struct sockaddr *) &server,
- sizeof(server));
- if(connectRes == 0){
- setSocketOptions();
- setSocketNonBlocking(theSocket);
- return true;
- }
-
- NDB_CLOSE_SOCKET(theSocket);
- theSocket = NDB_INVALID_SOCKET;
- return false;
-}
-
-
-
diff --git a/ndb/src/common/transporter/TCP_Transporter.hpp b/ndb/src/common/transporter/TCP_Transporter.hpp
index 30b730a5b1c..958cfde03a1 100644
--- a/ndb/src/common/transporter/TCP_Transporter.hpp
+++ b/ndb/src/common/transporter/TCP_Transporter.hpp
@@ -14,24 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-//****************************************************************************
-//
-// AUTHOR
-// Åsa Fransson
-//
-// NAME
-// TCP_Transporter
-//
-// DESCRIPTION
-// A TCP_Transporter instance is created when TCP/IP-communication
-// shall be used (user specified). It handles connect, disconnect,
-// send and receive.
-//
-//
-//
-//***************************************************************************/
-#ifndef TCP_Transporter_H
-#define TCP_Transporter_H
+#ifndef TCP_TRANSPORTER_HPP
+#define TCP_TRANSPORTER_HPP
#include "Transporter.hpp"
#include "SendBuffer.hpp"
@@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter {
friend class TransporterRegistry;
private:
// Initialize member variables
- TCP_Transporter(int sendBufferSize, int maxReceiveSize,
- int port,
- const char *rHostName,
+ TCP_Transporter(TransporterRegistry&,
+ int sendBufferSize, int maxReceiveSize,
const char *lHostName,
- NodeId rHostId, NodeId lHostId,
+ const char *rHostName,
+ int r_port,
+ NodeId lHostId,
+ NodeId rHostId,
int byteorder,
bool compression, bool checksum, bool signalId,
Uint32 reportFreq = 4096);
@@ -121,12 +107,14 @@ protected:
* A client connects to the remote server
* A server accepts any new connections
*/
- bool connectImpl(Uint32 timeOutMillis);
+ virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
+ virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
+ bool connect_common(NDB_SOCKET_TYPE sockfd);
/**
* Disconnects a TCP/IP node. Empty send and receivebuffer.
*/
- void disconnectImpl();
+ virtual void disconnectImpl();
private:
/**
@@ -134,22 +122,12 @@ private:
*/
SendBuffer m_sendBuffer;
- const bool isServer;
- const unsigned int port;
-
// Sending/Receiving socket used by both client and server
NDB_SOCKET_TYPE theSocket;
Uint32 maxReceiveSize;
/**
- * Remote host name/and address
- */
- char remoteHostName[256];
- struct in_addr remoteHostAddress;
- struct in_addr localHostAddress;
-
- /**
* Socket options
*/
int sockOptRcvBufSize;
@@ -164,43 +142,6 @@ private:
bool sendIsPossible(struct timeval * timeout);
/**
- * startTCPServer - None blocking
- *
- * create a server socket
- * bind
- * listen
- *
- * Note: Does not call accept
- */
- bool startTCPServer();
-
- /**
- * acceptClient - Blocking
- *
- * Accept a connection
- * checks if "right" client has connected
- * if so
- * close server socket
- * else
- * close newly created socket and goto begin
- */
- bool acceptClient(struct timeval * timeout);
-
- /**
- * Creates a client socket
- *
- * Note does not call connect
- */
- bool createClientSocket();
-
- /**
- * connectClient - Blocking
- *
- * connects to remote host
- */
- bool connectClient(struct timeval * timeout);
-
- /**
* Statistics
*/
Uint32 reportFreq;
diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp
index 5ca523d5185..c6f93d2cbea 100644
--- a/ndb/src/common/transporter/Transporter.cpp
+++ b/ndb/src/common/transporter/Transporter.cpp
@@ -15,132 +15,125 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include <TransporterRegistry.hpp>
+#include <TransporterCallback.hpp>
#include "Transporter.hpp"
#include "TransporterInternalDefinitions.hpp"
#include <NdbSleep.h>
-
-Transporter::Transporter(NodeId lNodeId, NodeId rNodeId,
+#include <SocketAuthenticator.hpp>
+#include <InputStream.hpp>
+#include <OutputStream.hpp>
+
+Transporter::Transporter(TransporterRegistry &t_reg,
+ const char *lHostName,
+ const char *rHostName,
+ int r_port,
+ NodeId lNodeId,
+ NodeId rNodeId,
int _byteorder,
bool _compression, bool _checksum, bool _signalId)
- : localNodeId(lNodeId), remoteNodeId(rNodeId),
- m_packer(_signalId, _checksum)
+ : m_r_port(r_port), localNodeId(lNodeId), remoteNodeId(rNodeId),
+ isServer(lNodeId < rNodeId),
+ m_packer(_signalId, _checksum),
+ m_transporter_registry(t_reg)
{
+ if (rHostName && strlen(rHostName) > 0){
+ strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
+ Ndb_getInAddr(&remoteHostAddress, rHostName);
+ }
+ else
+ {
+ if (!isServer) {
+ ndbout << "Unable to setup transporter. Node " << rNodeId
+ << " must have hostname. Update configuration." << endl;
+ exit(-1);
+ }
+ remoteHostName[0]= 0;
+ }
+ strncpy(localHostName, lHostName, sizeof(localHostName));
+
+ if (strlen(lHostName) > 0)
+ Ndb_getInAddr(&localHostAddress, lHostName);
+
byteOrder = _byteorder;
compressionUsed = _compression;
checksumUsed = _checksum;
signalIdUsed = _signalId;
- _threadError = TE_NO_ERROR;
-
- _connecting = false;
- _disconnecting = false;
- _connected = false;
- _timeOutMillis = 1000;
- theThreadPtr = NULL;
- theMutexPtr = NdbMutex_Create();
-}
-
-Transporter::~Transporter(){
- NdbMutex_Destroy(theMutexPtr);
+ m_connected = false;
+ m_timeOutMillis = 1000;
- if(theThreadPtr != 0){
- void * retVal;
- NdbThread_WaitFor(theThreadPtr, &retVal);
- NdbThread_Destroy(&theThreadPtr);
+ if (isServer)
+ m_socket_client= 0;
+ else
+ {
+ unsigned short tmp_port= 3307+rNodeId;
+ m_socket_client= new SocketClient(remoteHostName, tmp_port,
+ new SocketAuthSimple("ndbd passwd"));
}
}
-extern "C"
-void *
-runConnect_C(void * me)
-{
- runConnect(me);
- NdbThread_Exit(0);
- return NULL;
-}
-
-void *
-runConnect(void * me){
- Transporter * t = (Transporter *) me;
-
- DEBUG("Connect thread to " << t->remoteNodeId << " started");
-
- while(true){
- NdbMutex_Lock(t->theMutexPtr);
- if(t->_disconnecting){
- t->_connecting = false;
- NdbMutex_Unlock(t->theMutexPtr);
- DEBUG("Connect Thread " << t->remoteNodeId << " stop due to disconnect");
- return 0;
- }
- NdbMutex_Unlock(t->theMutexPtr);
-
- bool res = t->connectImpl(t->_timeOutMillis); // 1000 ms
- DEBUG("Waiting for " << t->remoteNodeId << "...");
- if(res){
- t->_connected = true;
- t->_connecting = false;
- t->_errorCount = 0;
- t->_threadError = TE_NO_ERROR;
- DEBUG("Connect Thread " << t->remoteNodeId << " stop due to connect");
- return 0;
- }
- }
+Transporter::~Transporter(){
+ if (m_socket_client)
+ delete m_socket_client;
}
-void
-Transporter::doConnect() {
+bool
+Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
+ if(m_connected)
+ return true; // TODO assert(0);
- NdbMutex_Lock(theMutexPtr);
- if(_connecting || _disconnecting || _connected){
- NdbMutex_Unlock(theMutexPtr);
- return;
+ bool res = connect_server_impl(sockfd);
+ if(res){
+ m_connected = true;
+ m_errorCount = 0;
}
-
- _connecting = true;
- _threadError = TE_NO_ERROR;
+ return res;
+}
- // Start thread
+bool
+Transporter::connect_client() {
+ if(m_connected)
+ return true;
+
+ NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
- char buf[16];
- snprintf(buf, sizeof(buf), "ndb_con_%d", remoteNodeId);
+ if (sockfd < 0)
+ return false;
+
+ // send info about own id
+ SocketOutputStream s_output(sockfd);
+ s_output.println("%d", localNodeId);
+
+ // get remote id
+ int nodeId;
+ SocketInputStream s_input(sockfd);
+ char buf[256];
+ if (s_input.gets(buf, 256) == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return false;
+ }
+ if (sscanf(buf, "%d", &nodeId) != 1) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return false;
+ }
- if(theThreadPtr != 0){
- void * retVal;
- NdbThread_WaitFor(theThreadPtr, &retVal);
- NdbThread_Destroy(&theThreadPtr);
+ bool res = connect_client_impl(sockfd);
+ if(res){
+ m_connected = true;
+ m_errorCount = 0;
}
-
- theThreadPtr = NdbThread_Create(runConnect_C,
- (void**)this,
- 32768,
- buf,
- NDB_THREAD_PRIO_LOW);
-
- NdbSleep_MilliSleep(100); // Let thread start
-
- NdbMutex_Unlock(theMutexPtr);
+ return res;
}
void
-Transporter::doDisconnect() {
-
- NdbMutex_Lock(theMutexPtr);
- _disconnecting = true;
- while(_connecting){
- DEBUG("Waiting for connect to finish...");
-
- NdbMutex_Unlock(theMutexPtr);
- NdbSleep_MilliSleep(500);
- NdbMutex_Lock(theMutexPtr);
- }
-
- _connected = false;
-
+Transporter::doDisconnect() {
+
+ if(!m_connected)
+ return; //assert(0); TODO will fail
+
disconnectImpl();
- _threadError = TE_NO_ERROR;
- _disconnecting = false;
-
- NdbMutex_Unlock(theMutexPtr);
+
+ m_connected= false;
}
diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp
index 43b26d45899..9a39f8788bc 100644
--- a/ndb/src/common/transporter/Transporter.hpp
+++ b/ndb/src/common/transporter/Transporter.hpp
@@ -19,6 +19,9 @@
#include <ndb_global.h>
+#include <SocketClient.hpp>
+
+#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "TransporterDefinitions.hpp"
#include "Packer.hpp"
@@ -40,8 +43,9 @@ public:
* None blocking
* Use isConnected() to check status
*/
- virtual void doConnect();
-
+ bool connect_client();
+ bool connect_server(NDB_SOCKET_TYPE socket);
+
/**
* Blocking
*/
@@ -60,14 +64,17 @@ public:
*/
NodeId getRemoteNodeId() const;
-
/**
- * Set callback object
+ * Local (own) Node Id
*/
- void setCallbackObject(void * callback);
+ NodeId getLocalNodeId() const;
protected:
- Transporter(NodeId lNodeId,
+ Transporter(TransporterRegistry &,
+ const char *lHostName,
+ const char *rHostName,
+ int r_port,
+ NodeId lNodeId,
NodeId rNodeId,
int byteorder,
bool compression,
@@ -78,58 +85,59 @@ protected:
* Blocking, for max timeOut milli seconds
* Returns true if connect succeded
*/
- virtual bool connectImpl(Uint32 timeOut) = 0;
+ virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
+ virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
/**
* Blocking
*/
virtual void disconnectImpl() = 0;
- const NodeId localNodeId;
+ /**
+ * Remote host name/and address
+ */
+ char remoteHostName[256];
+ char localHostName[256];
+ struct in_addr remoteHostAddress;
+ struct in_addr localHostAddress;
+
+ const unsigned int m_r_port;
+
const NodeId remoteNodeId;
+ const NodeId localNodeId;
+ const bool isServer;
+
unsigned createIndex;
int byteOrder;
bool compressionUsed;
bool checksumUsed;
bool signalIdUsed;
- Packer m_packer;
-
+ Packer m_packer;
private:
- /**
- * Thread and mutex for connect
- */
- NdbThread* theThreadPtr;
- friend void* runConnect(void * me);
+
+ SocketClient *m_socket_client;
protected:
- /**
- * Error reporting from connect thread(s)
- */
- void reportThreadError(NodeId nodeId,
- TransporterError errorCode);
Uint32 getErrorCount();
- TransporterError getThreadError();
- void resetThreadError();
- TransporterError _threadError;
- Uint32 _timeOutMillis;
- Uint32 _errorCount;
-
-protected:
- NdbMutex* theMutexPtr;
- bool _connected; // Are we connected
- bool _connecting; // Connect thread is running
- bool _disconnecting; // We are disconnecting
-
- void * callbackObj;
+ Uint32 m_errorCount;
+ Uint32 m_timeOutMillis;
+
+protected:
+ bool m_connected; // Are we connected
+
+ TransporterRegistry &m_transporter_registry;
+ void *get_callback_obj() { return m_transporter_registry.callbackObj; };
+ void report_disconnect(int err){m_transporter_registry.report_disconnect(remoteNodeId,err);};
+ void report_error(enum TransporterError err){reportError(get_callback_obj(),remoteNodeId,err);};
};
inline
bool
Transporter::isConnected() const {
- return _connected;
+ return m_connected;
}
inline
@@ -138,42 +146,17 @@ Transporter::getRemoteNodeId() const {
return remoteNodeId;
}
-inline
-void
-Transporter::reportThreadError(NodeId nodeId, TransporterError errorCode)
-{
-#if 0
- ndbout_c("Transporter::reportThreadError (NodeId: %d, Error code: %d)",
- nodeId, errorCode);
-#endif
- _threadError = errorCode;
- _errorCount++;
-}
-
inline
-TransporterError
-Transporter::getThreadError(){
- return _threadError;
+NodeId
+Transporter::getLocalNodeId() const {
+ return remoteNodeId;
}
inline
Uint32
Transporter::getErrorCount()
{
- return _errorCount;
-}
-
-inline
-void
-Transporter::resetThreadError()
-{
- _threadError = TE_NO_ERROR;
-}
-
-inline
-void
-Transporter::setCallbackObject(void * callback) {
- callbackObj = callback;
+ return m_errorCount;
}
#endif // Define of Transporter_H
diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp
index 3f98eeed89e..bad3b44706f 100644
--- a/ndb/src/common/transporter/TransporterRegistry.cpp
+++ b/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -16,10 +16,11 @@
#include <ndb_global.h>
-#include "TransporterRegistry.hpp"
+#include <TransporterRegistry.hpp>
#include "TransporterInternalDefinitions.hpp"
#include "Transporter.hpp"
+#include <SocketAuthenticator.hpp>
#ifdef NDB_TCP_TRANSPORTER
#include "TCP_Transporter.hpp"
@@ -42,20 +43,67 @@
#include "NdbOut.hpp"
#include <NdbSleep.h>
#include <NdbTick.h>
-#define STEPPING 1
+#include <InputStream.hpp>
+#include <OutputStream.hpp>
+
+SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
+{
+ if (m_auth && !m_auth->server_authenticate(sockfd)){
+ NDB_CLOSE_SOCKET(sockfd);
+ return 0;
+ }
+
+ {
+ // read node id from client
+ int nodeId;
+ SocketInputStream s_input(sockfd);
+ char buf[256];
+ if (s_input.gets(buf, 256) == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return 0;
+ }
+ if (sscanf(buf, "%d", &nodeId) != 1) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return 0;
+ }
+
+ //check that nodeid is valid and that there is an allocated transporter
+ if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return 0;
+ }
+ if (m_transporter_registry->theTransporters[nodeId] == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return 0;
+ }
+
+ //check that the transporter should be connected
+ if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) {
+ NDB_CLOSE_SOCKET(sockfd);
+ return 0;
+ }
+
+ Transporter *t= m_transporter_registry->theTransporters[nodeId];
+
+ // send info about own id (just as response to acnowledge connection)
+ SocketOutputStream s_output(sockfd);
+ s_output.println("%d", t->getLocalNodeId());
+
+ // setup transporter (transporter responsable for closing sockfd)
+ t->connect_server(sockfd);
+ }
+
+ return 0;
+}
TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) {
+ m_transporter_service= 0;
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
- m_ccCount = 0;
- m_ccIndex = 0;
- m_ccStep = STEPPING;
- m_ccReady = false;
- m_nTransportersPerformConnect=0;
callbackObj=callback;
@@ -82,7 +130,7 @@ TransporterRegistry::TransporterRegistry(void * callback,
theSHMTransporters[i] = NULL;
theOSETransporters[i] = NULL;
theTransporters[i] = NULL;
- performStates[i] = PerformNothing;
+ performStates[i] = DISCONNECTED;
ioStates[i] = NoHalt;
}
theOSEReceiver = 0;
@@ -154,13 +202,14 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false;
- TCP_Transporter * t = new TCP_Transporter(config->sendBufferSize,
- config->maxReceiveSize,
- config->port,
- config->remoteHostName,
+ TCP_Transporter * t = new TCP_Transporter(*this,
+ config->sendBufferSize,
+ config->maxReceiveSize,
config->localHostName,
- config->remoteNodeId,
+ config->remoteHostName,
+ config->port,
localNodeId,
+ config->remoteNodeId,
config->byteOrder,
config->compression,
config->checksum,
@@ -172,13 +221,11 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false;
}
- t->setCallbackObject(callbackObj);
-
// Put the transporter in the transporter arrays
theTCPTransporters[nTCPTransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
- performStates[t->getRemoteNodeId()] = PerformNothing;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nTCPTransporters++;
@@ -228,12 +275,11 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
delete t;
return false;
}
- t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theOSETransporters[nOSETransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
- performStates[t->getRemoteNodeId()] = PerformNothing;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nOSETransporters++;
@@ -279,12 +325,11 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
delete t;
return false;
}
- t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theSCITransporters[nSCITransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
- performStates[t->getRemoteNodeId()] = PerformNothing;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nSCITransporters++;
@@ -321,12 +366,11 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
delete t;
return false;
}
- t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theSHMTransporters[nSHMTransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
- performStates[t->getRemoteNodeId()] = PerformNothing;
+ performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nSHMTransporters++;
@@ -781,7 +825,7 @@ TransporterRegistry::performReceive(){
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket();
- if(performStates[nodeId] == PerformIO){
+ if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) {
const int receiveSize = t->doReceive();
if(receiveSize > 0){
@@ -804,7 +848,7 @@ TransporterRegistry::performReceive(){
checkJobBuffer();
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
- if(performStates[nodeId] == PerformIO){
+ if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
@@ -819,7 +863,7 @@ TransporterRegistry::performReceive(){
checkJobBuffer();
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
- if(performStates[nodeId] == PerformIO){
+ if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
@@ -840,7 +884,7 @@ TransporterRegistry::performSend(){
#ifdef NDB_OSE_TRANSPORTER
for (int i = 0; i < nOSETransporters; i++){
OSE_Transporter *t = theOSETransporters[i];
- if((performStates[t->getRemoteNodeId()] == PerformIO) &&
+ if((is_connected(t->getRemoteNodeId()) &&
(t->isConnected())) {
t->doSend();
}//if
@@ -887,7 +931,7 @@ TransporterRegistry::performSend(){
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const int socket = t->getSocket();
- if(performStates[nodeId] == PerformIO){
+ if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &writeset)) {
t->doSend();
}//if
@@ -901,7 +945,7 @@ TransporterRegistry::performSend(){
if (t &&
(t->hasDataToSend()) &&
(t->isConnected()) &&
- (performStates[t->getRemoteNodeId()] == PerformIO)) {
+ (is_connected(t->getRemoteNodeId()))) {
t->doSend();
}//if
}//for
@@ -910,7 +954,7 @@ TransporterRegistry::performSend(){
if (t &&
(t->hasDataToSend()) &&
(t->isConnected()) &&
- (performStates[t->getRemoteNodeId()] == PerformIO)) {
+ (is_connected(t->getRemoteNodeId()))) {
t->doSend();
}//if
}//for
@@ -925,7 +969,7 @@ TransporterRegistry::performSend(){
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
- if(performStates[nodeId] == PerformIO){
+ if(is_connected(nodeId)){
if(t->isConnected() && t->hasDataToSend()) {
t->doSend();
} //if
@@ -961,70 +1005,210 @@ TransporterRegistry::printState(){
}
#endif
-PerformState
-TransporterRegistry::performState(NodeId nodeId) {
- return performStates[nodeId];
+IOState
+TransporterRegistry::ioState(NodeId nodeId) {
+ return ioStates[nodeId];
}
-#ifdef DEBUG_TRANSPORTER
-const char *
-performStateString(PerformState state){
- switch(state){
- case PerformNothing:
- return "PerformNothing";
- break;
- case PerformIO:
- return "PerformIO";
+void
+TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
+ DEBUG("TransporterRegistry::setIOState("
+ << nodeId << ", " << state << ")");
+ ioStates[nodeId] = state;
+}
+
+static void *
+run_start_clients_C(void * me)
+{
+ ((TransporterRegistry*) me)->start_clients_thread();
+ NdbThread_Exit(0);
+ return me;
+}
+
+// Run by kernel thread
+void
+TransporterRegistry::do_connect(NodeId node_id)
+{
+ PerformState &curr_state = performStates[node_id];
+ switch(curr_state){
+ case DISCONNECTED:
break;
- case PerformConnect:
- return "PerformConnect";
+ case CONNECTED:
+ return;
+ case CONNECTING:
+ return;
+ case DISCONNECTING:
break;
- case PerformDisconnect:
- return "PerformDisconnect";
+ }
+ curr_state= CONNECTING;
+}
+void
+TransporterRegistry::do_disconnect(NodeId node_id)
+{
+ PerformState &curr_state = performStates[node_id];
+ switch(curr_state){
+ case DISCONNECTED:
+ return;
+ case CONNECTED:
break;
- case RemoveTransporter:
- return "RemoveTransporter";
+ case CONNECTING:
break;
+ case DISCONNECTING:
+ return;
}
- return "Unknown";
+ curr_state= DISCONNECTING;
}
-#endif
void
-TransporterRegistry::setPerformState(NodeId nodeId, PerformState state) {
- DEBUG("TransporterRegistry::setPerformState("
- << nodeId << ", " << performStateString(state) << ")");
-
- performStates[nodeId] = state;
+TransporterRegistry::report_connect(NodeId node_id)
+{
+ performStates[node_id] = CONNECTED;
+ reportConnect(callbackObj, node_id);
+}
+
+void
+TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
+{
+ performStates[node_id] = DISCONNECTED;
+ reportDisconnect(callbackObj, node_id, errnum);
}
void
-TransporterRegistry::setPerformState(PerformState state) {
- int count = 0;
- int index = 0;
- while(count < nTransporters){
- if(theTransporters[index] != 0){
- setPerformState(theTransporters[index]->getRemoteNodeId(), state);
- count ++;
+TransporterRegistry::update_connections()
+{
+ for (int i= 0, n= 0; n < nTransporters; i++){
+ Transporter * t = theTransporters[i];
+ if (!t)
+ continue;
+ n++;
+
+ const NodeId nodeId = t->getRemoteNodeId();
+ switch(performStates[nodeId]){
+ case CONNECTED:
+ case DISCONNECTED:
+ break;
+ case CONNECTING:
+ if(t->isConnected())
+ report_connect(nodeId);
+ break;
+ case DISCONNECTING:
+ if(!t->isConnected())
+ report_disconnect(nodeId, 0);
+ break;
}
- index ++;
}
}
-IOState
-TransporterRegistry::ioState(NodeId nodeId) {
- return ioStates[nodeId];
+// run as own thread
+void
+TransporterRegistry::start_clients_thread()
+{
+ while (m_run_start_clients_thread) {
+ NdbSleep_MilliSleep(100);
+ for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
+ Transporter * t = theTransporters[i];
+ if (!t)
+ continue;
+ n++;
+
+ const NodeId nodeId = t->getRemoteNodeId();
+ switch(performStates[nodeId]){
+ case CONNECTING:
+ if(!t->isConnected() && !t->isServer)
+ t->connect_client();
+ break;
+ case DISCONNECTING:
+ if(t->isConnected())
+ t->doDisconnect();
+ break;
+ default:
+ break;
+ }
+ }
+ }
}
-void
-TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
- DEBUG("TransporterRegistry::setIOState("
- << nodeId << ", " << state << ")");
- ioStates[nodeId] = state;
+bool
+TransporterRegistry::start_clients()
+{
+ m_run_start_clients_thread= true;
+ m_start_clients_thread= NdbThread_Create(run_start_clients_C,
+ (void**)this,
+ 32768,
+ "ndb_start_clients",
+ NDB_THREAD_PRIO_LOW);
+ if (m_start_clients_thread == 0) {
+ m_run_start_clients_thread= false;
+ return false;
+ }
+ return true;
+}
+
+bool
+TransporterRegistry::stop_clients()
+{
+ if (m_start_clients_thread) {
+ m_run_start_clients_thread= false;
+ void* status;
+ int r= NdbThread_WaitFor(m_start_clients_thread, &status);
+ NdbThread_Destroy(&m_start_clients_thread);
+ }
+ return true;
+}
+
+bool
+TransporterRegistry::start_service(SocketServer& socket_server)
+{
+#if 0
+ for (int i= 0, n= 0; n < nTransporters; i++){
+ Transporter * t = theTransporters[i];
+ if (!t)
+ continue;
+ n++;
+ if (t->isServer) {
+ t->m_service = new TransporterService(new SocketAuthSimple("ndbd passwd"));
+ if(!socket_server.setup(t->m_service, t->m_r_port, 0))
+ {
+ ndbout_c("Unable to setup transporter service port: %d!\n"
+ "Please check if the port is already used,\n"
+ "(perhaps a mgmtsrvrserver is already running)",
+ m_service_port);
+ delete t->m_service;
+ return false;
+ }
+ }
+ }
+#endif
+
+ m_transporter_service = new TransporterService(new SocketAuthSimple("ndbd passwd"));
+
+ if (nodeIdSpecified != true) {
+ ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
+ return false;
+ }
+
+ m_service_port = 3307 + localNodeId;
+ //m_interface_name = "ndbd";
+ m_interface_name = 0;
+
+ if(!socket_server.setup(m_transporter_service, m_service_port, m_interface_name))
+ {
+ ndbout_c("Unable to setup transporter service port: %d!\n"
+ "Please check if the port is already used,\n"
+ "(perhaps a mgmtsrvrserver is already running)",
+ m_service_port);
+ delete m_transporter_service;
+ return false;
+ }
+
+ m_transporter_service->setTransporterRegistry(this);
+
+ return true;
}
void
-TransporterRegistry::startReceiving(){
+TransporterRegistry::startReceiving()
+{
#ifdef NDB_OSE_TRANSPORTER
if(theOSEReceiver != NULL){
theOSEReceiver->createPhantom();
@@ -1081,99 +1265,6 @@ TransporterRegistry::stopSending(){
#endif
}
-/**
- * The old implementation did not scale with a large
- * number of nodes. (Watchdog killed NDB because
- * it took too long time to allocated threads in
- * doConnect.
- *
- * The new implementation only checks the connection
- * for a number of transporters (STEPPING), until to
- * the point where all transporters has executed
- * doConnect once. After that, the behaviour is as
- * in the old implemenation, i.e, checking the connection
- * for all transporters.
- * @todo: instead of STEPPING, maybe we should only
- * allow checkConnections to execute for a certain
- * time that somehow factors in heartbeat times and
- * watchdog times.
- *
- */
-
-void
-TransporterRegistry::checkConnections(){
- if(m_ccStep > nTransporters)
- m_ccStep = nTransporters;
-
- while(m_ccCount < m_ccStep){
- if(theTransporters[m_ccIndex] != 0){
- Transporter * t = theTransporters[m_ccIndex];
- const NodeId nodeId = t->getRemoteNodeId();
- if(t->getThreadError() != 0) {
- reportError(callbackObj, nodeId, t->getThreadError());
- t->resetThreadError();
- }
-
- switch(performStates[nodeId]){
- case PerformConnect:
- if(!t->isConnected()){
- t->doConnect();
- if(m_nTransportersPerformConnect!=nTransporters)
- m_nTransportersPerformConnect++;
-
- } else {
- performStates[nodeId] = PerformIO;
- reportConnect(callbackObj, nodeId);
- }
- break;
- case PerformDisconnect:
- {
- bool wasConnected = t->isConnected();
- t->doDisconnect();
- performStates[nodeId] = PerformNothing;
- if(wasConnected){
- reportDisconnect(callbackObj, nodeId,0);
- }
- }
- break;
- case RemoveTransporter:
- removeTransporter(nodeId);
- break;
- case PerformNothing:
- case PerformIO:
- break;
- }
- m_ccCount ++;
- }
- m_ccIndex ++;
- }
-
- if(!m_ccReady) {
- if(m_ccCount < nTransporters) {
- if(nTransporters - m_ccStep < STEPPING)
- m_ccStep += nTransporters-m_ccStep;
- else
- m_ccStep += STEPPING;
-
- // ndbout_c("count %d step %d ", m_ccCount, m_ccStep);
- }
- else {
- m_ccCount = 0;
- m_ccIndex = 0;
- m_ccStep = STEPPING;
- // ndbout_c("count %d step %d ", m_ccCount, m_ccStep);
- }
- }
- if((nTransporters == m_nTransportersPerformConnect) || m_ccReady) {
- m_ccReady = true;
- m_ccCount = 0;
- m_ccIndex = 0;
- m_ccStep = nTransporters;
- // ndbout_c("alla count %d step %d ", m_ccCount, m_ccStep);
- }
-
-}//TransporterRegistry::checkConnections()
-
NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
out << "-- Signal Header --" << endl;
out << "theLength: " << sh.theLength << endl;
diff --git a/ndb/src/common/util/Makefile.am b/ndb/src/common/util/Makefile.am
index 59d9775b8e3..678added01e 100644
--- a/ndb/src/common/util/Makefile.am
+++ b/ndb/src/common/util/Makefile.am
@@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la
libgeneral_la_SOURCES = \
File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \
- SimpleProperties.cpp Parser.cpp InputStream.cpp SocketServer.cpp \
+ SimpleProperties.cpp Parser.cpp InputStream.cpp \
+ SocketServer.cpp SocketClient.cpp SocketAuthenticator.cpp\
OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \
NdbSqlUtil.cpp new.cpp \
uucode.c random.c getarg.c version.c \
diff --git a/ndb/src/common/util/SocketAuthenticator.cpp b/ndb/src/common/util/SocketAuthenticator.cpp
new file mode 100644
index 00000000000..d0abf89b2b1
--- /dev/null
+++ b/ndb/src/common/util/SocketAuthenticator.cpp
@@ -0,0 +1,63 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <ndb_global.h>
+
+#include <SocketClient.hpp>
+#include <SocketAuthenticator.hpp>
+#include <NdbOut.hpp>
+
+SocketAuthSimple::SocketAuthSimple(const char *passwd) {
+ m_passwd= strdup(passwd);
+ m_buf= (char*)malloc(strlen(passwd)+1);
+}
+
+SocketAuthSimple::~SocketAuthSimple()
+{
+ if (m_passwd)
+ free((void*)m_passwd);
+ if (m_buf)
+ free(m_buf);
+}
+
+bool SocketAuthSimple::client_authenticate(int sockfd)
+{
+ if (!m_passwd)
+ return false;
+
+ int len = strlen(m_passwd);
+ int r;
+ r= send(sockfd, m_passwd, len, 0);
+
+ r= recv(sockfd, m_buf, len, 0);
+ m_buf[r]= '\0';
+
+ return true;
+}
+
+bool SocketAuthSimple::server_authenticate(int sockfd)
+{
+ if (!m_passwd)
+ return false;
+
+ int len = strlen(m_passwd), r;
+ r= recv(sockfd, m_buf, len, 0);
+ m_buf[r]= '\0';
+ r= send(sockfd, m_passwd, len, 0);
+
+ return true;
+}
diff --git a/ndb/src/common/util/SocketClient.cpp b/ndb/src/common/util/SocketClient.cpp
new file mode 100644
index 00000000000..b7769633875
--- /dev/null
+++ b/ndb/src/common/util/SocketClient.cpp
@@ -0,0 +1,90 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <ndb_global.h>
+#include <NdbOut.hpp>
+
+#include <SocketClient.hpp>
+#include <SocketAuthenticator.hpp>
+
+SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa)
+{
+ m_auth= sa;
+ m_port= port;
+ m_server_name= strdup(server_name);
+ m_sockfd= -1;
+}
+
+SocketClient::~SocketClient()
+{
+ if (m_server_name)
+ free(m_server_name);
+ if (m_sockfd >= 0)
+ NDB_CLOSE_SOCKET(m_sockfd);
+ if (m_auth)
+ delete m_auth;
+}
+
+bool
+SocketClient::init()
+{
+ if (m_sockfd >= 0)
+ NDB_CLOSE_SOCKET(m_sockfd);
+
+ memset(&m_servaddr, 0, sizeof(m_servaddr));
+ m_servaddr.sin_family = AF_INET;
+ m_servaddr.sin_port = htons(m_port);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
+ return false;
+
+ m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
+ if (m_sockfd == NDB_INVALID_SOCKET) {
+ return false;
+ }
+
+ return true;
+}
+
+NDB_SOCKET_TYPE
+SocketClient::connect()
+{
+ if (m_sockfd < 0)
+ {
+ if (!init()) {
+ ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl;
+ return -1;
+ }
+ }
+
+ const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
+ if (r == -1)
+ return -1;
+
+ if (m_auth)
+ if (!m_auth->client_authenticate(m_sockfd))
+ {
+ NDB_CLOSE_SOCKET(m_sockfd);
+ m_sockfd= -1;
+ return -1;
+ }
+
+ NDB_SOCKET_TYPE sockfd= m_sockfd;
+ m_sockfd= -1;
+
+ return sockfd;
+}
diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp
index a0ec0aaa676..67cbf8aba4a 100644
--- a/ndb/src/common/util/SocketServer.cpp
+++ b/ndb/src/common/util/SocketServer.cpp
@@ -17,7 +17,7 @@
#include <ndb_global.h>
-#include "SocketServer.hpp"
+#include <SocketServer.hpp>
#include <NdbTCP.h>
#include <NdbOut.hpp>